diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index f2cc7163f..7640109a7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -1078,7 +1078,7 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> ?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>); {error, {unhealthy_target, Message}} -> ?BAD_REQUEST(Message); - {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> + {error, Reason} -> ?BAD_REQUEST(redact(Reason)) end. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 0a080bc3e..efa2c874b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -434,39 +434,71 @@ disable_enable(Action, BridgeType, BridgeName) when %% Manually start connector. This function can speed up reconnection when %% waiting for auto reconnection. The function forwards the start request to -%% its connector. +%% its connector. Returns ok if the status of the bridge is connected after +%% starting the connector. Returns {error, Reason} if the status of the bridge +%% is something else than connected after starting the connector or if an +%% error occurred when the connector was started. +-spec start(term(), term()) -> ok | {error, Reason :: term()}. start(BridgeV2Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:start(ConnectorType, ConnectorName) end, - connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun). + connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, true). -connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun) -> +connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, DoHealthCheck) -> connector_operation_helper_with_conf( BridgeV2Type, + Name, lookup_conf(BridgeV2Type, Name), - ConnectorOpFun + ConnectorOpFun, + DoHealthCheck ). connector_operation_helper_with_conf( _BridgeV2Type, + _Name, {error, bridge_not_found} = Error, - _ConnectorOpFun + _ConnectorOpFun, + _DoHealthCheck ) -> Error; connector_operation_helper_with_conf( _BridgeV2Type, + _Name, #{enable := false}, - _ConnectorOpFun + _ConnectorOpFun, + _DoHealthCheck ) -> ok; connector_operation_helper_with_conf( BridgeV2Type, + Name, #{connector := ConnectorName}, - ConnectorOpFun + ConnectorOpFun, + DoHealthCheck ) -> ConnectorType = connector_type(BridgeV2Type), - ConnectorOpFun(ConnectorType, ConnectorName). + ConnectorOpFunResult = ConnectorOpFun(ConnectorType, ConnectorName), + case {DoHealthCheck, ConnectorOpFunResult} of + {false, _} -> + ConnectorOpFunResult; + {true, {error, Reason}} -> + {error, Reason}; + {true, ok} -> + case health_check(BridgeV2Type, Name) of + #{status := connected} -> + ok; + {error, Reason} -> + {error, Reason}; + #{status := Status, error := Reason} -> + Msg = io_lib:format( + "Connector started but bridge (~s:~s) is not connected. " + "Bridge Status: ~p, Error: ~p", + [bin(BridgeV2Type), bin(Name), Status, Reason] + ), + {error, iolist_to_binary(Msg)} + end + end. reset_metrics(Type, Name) -> reset_metrics_helper(Type, Name, lookup_conf(Type, Name)). @@ -513,6 +545,9 @@ do_send_msg_with_enabled_config( BridgeV2Id = id(BridgeType, BridgeName), emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). +-spec health_check(BridgeType :: term(), BridgeName :: term()) -> + #{status := term(), error := term()} | {error, Reason :: term()}. + health_check(BridgeType, BridgeName) -> case lookup_conf(BridgeType, BridgeName) of #{ @@ -1365,28 +1400,30 @@ bridge_v1_restart(BridgeV1Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:restart(ConnectorType, ConnectorName) end, - bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun). + bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, true). bridge_v1_stop(BridgeV1Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:stop(ConnectorType, ConnectorName) end, - bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun). + bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, false). bridge_v1_start(BridgeV1Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:start(ConnectorType, ConnectorName) end, - bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun). + bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, true). -bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun) -> +bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case emqx_bridge_v2:is_valid_bridge_v1(BridgeV1Type, Name) of true -> connector_operation_helper_with_conf( BridgeV2Type, + Name, lookup_conf(BridgeV2Type, Name), - ConnectorOpFun + ConnectorOpFun, + DoHealthCheck ); false -> {error, not_bridge_v1_compatible} diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index f495a770b..0e48fe43e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -606,9 +606,7 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> ?SERVICE_UNAVAILABLE(<<"Bridge not found on remote node: ", BridgeId/binary>>); {error, {node_not_found, Node}} -> ?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>); - {error, {unhealthy_target, Message}} -> - ?BAD_REQUEST(Message); - {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> + {error, Reason} -> ?BAD_REQUEST(redact(Reason)) end. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 837425888..c6d3d0566 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -806,6 +806,44 @@ t_remove_multiple_connectors_being_referenced_without_channels(_Config) -> ), ok. +t_start_operation_when_on_add_channel_gives_error(_Config) -> + Conf = bridge_config(), + BridgeName = my_test_bridge, + emqx_common_test_helpers:with_mock( + emqx_bridge_v2_test_connector, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {error, <<"some_error">>} end, + fun() -> + %% We can crete the bridge event though on_add_channel returns error + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)), + ?assertMatch( + #{ + status := disconnected, + error := <<"some_error">> + }, + emqx_bridge_v2:health_check(bridge_type(), BridgeName) + ), + ?assertMatch( + {ok, #{ + status := disconnected, + error := <<"some_error">> + }}, + emqx_bridge_v2:lookup(bridge_type(), BridgeName) + ), + %% emqx_bridge_v2:start/2 should return ok if bridge if connected after + %% start and otherwise and error + ?assertMatch({error, _}, emqx_bridge_v2:start(bridge_type(), BridgeName)), + %% Let us change on_add_channel to be successful and try again + ok = meck:expect( + emqx_bridge_v2_test_connector, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {ok, #{}} end + ), + ?assertMatch(ok, emqx_bridge_v2:start(bridge_type(), BridgeName)) + end + ), + ok. + %% Helper Functions wait_until(Fun) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 484a1a325..73d9728d7 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -147,6 +147,7 @@ emqx, emqx_auth, emqx_management, + emqx_connector, {emqx_bridge, "bridges_v2 {}"}, {emqx_rule_engine, "rule_engine { rules {} }"} ]). @@ -280,6 +281,9 @@ init_mocks() -> meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}), meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}), meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected), + ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId) + end), [?CONNECTOR_IMPL, emqx_connector_ee_schema]. clear_resources() -> @@ -504,6 +508,35 @@ do_start_bridge(TestType, Config) -> {ok, 400, _} = request(post, {operation, TestType, invalidop, BridgeID}, Config), + %% Make start bridge fail + expect_on_all_nodes( + ?CONNECTOR_IMPL, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {error, <<"my_error">>} end, + Config + ), + ConnectorID = emqx_connector_resource:connector_id(?BRIDGE_TYPE, ?CONNECTOR_NAME), + {ok, 204, <<>>} = emqx_connector_api_SUITE:request( + post, {operation, TestType, stop, ConnectorID}, Config + ), + {ok, 204, <<>>} = emqx_connector_api_SUITE:request( + post, {operation, TestType, start, ConnectorID}, Config + ), + + {ok, 400, _} = request(post, {operation, TestType, start, BridgeID}, Config), + + %% Make start bridge succeed + + expect_on_all_nodes( + ?CONNECTOR_IMPL, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {ok, connector_state} end, + Config + ), + + %% try to start again + {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config), + %% delete the bridge {ok, 204, <<>>} = request(delete, uri([?ROOT, BridgeID]), Config), {ok, 200, []} = request_json(get, uri([?ROOT]), Config), @@ -514,6 +547,15 @@ do_start_bridge(TestType, Config) -> {ok, 404, _} = request(post, {operation, TestType, start, <<"webhook:cptn_hook">>}, Config), ok. +expect_on_all_nodes(Mod, Function, Fun, Config) -> + case ?config(cluster_nodes, Config) of + undefined -> + ok = meck:expect(Mod, Function, Fun); + Nodes -> + [erpc:call(Node, meck, expect, [Mod, Function, Fun]) || Node <- Nodes] + end, + ok. + %% t_start_stop_inconsistent_bridge_node(Config) -> %% start_stop_inconsistent_bridge(node, Config).