From 36640263ba16a2ac47a4d77c6a0b8697966b157d Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 2 Nov 2023 18:28:22 +0100 Subject: [PATCH 1/3] fix(bridge_v2): start operation should return an error when unsuccessful The bridge V2 HTTP API start operation should return a 400 error if the start is unsuccessful. The bridge V1 HTTP API compatibility layer for Bridge V2 should return a 400 error if the start or restart operation is unsuccessful. This commit fixes the above and adds tests that checks this for the V2 HTTP API. Fixes: https://emqx.atlassian.net/browse/EMQX-11304 --- apps/emqx_bridge/src/emqx_bridge_api.erl | 4 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 58 ++++++++++++++----- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 4 +- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 38 ++++++++++++ .../test/emqx_bridge_v2_api_SUITE.erl | 26 +++++++++ 5 files changed, 115 insertions(+), 15 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index f2cc7163f..11f2949c3 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -1079,7 +1079,9 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> {error, {unhealthy_target, Message}} -> ?BAD_REQUEST(Message); {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> - ?BAD_REQUEST(redact(Reason)) + ?BAD_REQUEST(redact(Reason)); + {error, Reason} -> + ?BAD_REQUEST(Reason) end. maybe_try_restart(all, start_bridges_to_all_nodes, Args) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 0a080bc3e..3498b690b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -434,39 +434,66 @@ disable_enable(Action, BridgeType, BridgeName) when %% Manually start connector. This function can speed up reconnection when %% waiting for auto reconnection. The function forwards the start request to -%% its connector. +%% its connector. Returns ok if the status of the bridge is connected after +%% starting the connector. Returns {error, Reason} if the status of the bridge +%% is something else than connected after starting the connector or if an +%% error occurred when the connector was started. +-spec start(term(), term()) -> ok | {error, Reason :: term()}. start(BridgeV2Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:start(ConnectorType, ConnectorName) end, - connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun). + connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, true). -connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun) -> +connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, DoHealthCheck) -> connector_operation_helper_with_conf( BridgeV2Type, + Name, lookup_conf(BridgeV2Type, Name), - ConnectorOpFun + ConnectorOpFun, + DoHealthCheck ). connector_operation_helper_with_conf( _BridgeV2Type, + _Name, {error, bridge_not_found} = Error, - _ConnectorOpFun + _ConnectorOpFun, + _DoHealthCheck ) -> Error; connector_operation_helper_with_conf( _BridgeV2Type, + _Name, #{enable := false}, - _ConnectorOpFun + _ConnectorOpFun, + _DoHealthCheck ) -> ok; connector_operation_helper_with_conf( BridgeV2Type, + Name, #{connector := ConnectorName}, - ConnectorOpFun + ConnectorOpFun, + DoHealthCheck ) -> ConnectorType = connector_type(BridgeV2Type), - ConnectorOpFun(ConnectorType, ConnectorName). + ConnectorOpFunResult = ConnectorOpFun(ConnectorType, ConnectorName), + case {DoHealthCheck, ConnectorOpFunResult} of + {false, _} -> + ConnectorOpFunResult; + {true, {error, Reason}} -> + {error, Reason}; + {true, ok} -> + case health_check(BridgeV2Type, Name) of + #{status := connected} -> + ok; + {error, Reason} -> + {error, Reason}; + NonConnectedStatus -> + {error, NonConnectedStatus} + end + end. reset_metrics(Type, Name) -> reset_metrics_helper(Type, Name, lookup_conf(Type, Name)). @@ -513,6 +540,9 @@ do_send_msg_with_enabled_config( BridgeV2Id = id(BridgeType, BridgeName), emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). +-spec health_check(BridgeType :: term(), BridgeName :: term()) -> + #{status := term(), error := term()} | {error, Reason :: term()}. + health_check(BridgeType, BridgeName) -> case lookup_conf(BridgeType, BridgeName) of #{ @@ -1365,28 +1395,30 @@ bridge_v1_restart(BridgeV1Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:restart(ConnectorType, ConnectorName) end, - bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun). + bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, true). bridge_v1_stop(BridgeV1Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:stop(ConnectorType, ConnectorName) end, - bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun). + bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, false). bridge_v1_start(BridgeV1Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:start(ConnectorType, ConnectorName) end, - bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun). + bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, true). -bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun) -> +bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case emqx_bridge_v2:is_valid_bridge_v1(BridgeV1Type, Name) of true -> connector_operation_helper_with_conf( BridgeV2Type, + Name, lookup_conf(BridgeV2Type, Name), - ConnectorOpFun + ConnectorOpFun, + DoHealthCheck ); false -> {error, not_bridge_v1_compatible} diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index f495a770b..6f1ff272b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -609,7 +609,9 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> {error, {unhealthy_target, Message}} -> ?BAD_REQUEST(Message); {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> - ?BAD_REQUEST(redact(Reason)) + ?BAD_REQUEST(redact(Reason)); + {error, Reason} -> + ?BAD_REQUEST(Reason) end. do_bpapi_call(all, Call, Args) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 837425888..c6d3d0566 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -806,6 +806,44 @@ t_remove_multiple_connectors_being_referenced_without_channels(_Config) -> ), ok. +t_start_operation_when_on_add_channel_gives_error(_Config) -> + Conf = bridge_config(), + BridgeName = my_test_bridge, + emqx_common_test_helpers:with_mock( + emqx_bridge_v2_test_connector, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {error, <<"some_error">>} end, + fun() -> + %% We can crete the bridge event though on_add_channel returns error + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)), + ?assertMatch( + #{ + status := disconnected, + error := <<"some_error">> + }, + emqx_bridge_v2:health_check(bridge_type(), BridgeName) + ), + ?assertMatch( + {ok, #{ + status := disconnected, + error := <<"some_error">> + }}, + emqx_bridge_v2:lookup(bridge_type(), BridgeName) + ), + %% emqx_bridge_v2:start/2 should return ok if bridge if connected after + %% start and otherwise and error + ?assertMatch({error, _}, emqx_bridge_v2:start(bridge_type(), BridgeName)), + %% Let us change on_add_channel to be successful and try again + ok = meck:expect( + emqx_bridge_v2_test_connector, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {ok, #{}} end + ), + ?assertMatch(ok, emqx_bridge_v2:start(bridge_type(), BridgeName)) + end + ), + ok. + %% Helper Functions wait_until(Fun) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 484a1a325..96eccf169 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -280,6 +280,9 @@ init_mocks() -> meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}), meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}), meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected), + ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId) + end), [?CONNECTOR_IMPL, emqx_connector_ee_schema]. clear_resources() -> @@ -504,6 +507,29 @@ do_start_bridge(TestType, Config) -> {ok, 400, _} = request(post, {operation, TestType, invalidop, BridgeID}, Config), + %% Make start bridge fail + ok = meck:expect( + ?CONNECTOR_IMPL, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {error, <<"my_error">>} end + ), + + ok = emqx_connector_resource:stop(?BRIDGE_TYPE, ?CONNECTOR_NAME), + ok = emqx_connector_resource:start(?BRIDGE_TYPE, ?CONNECTOR_NAME), + + {ok, 400, _} = request(post, {operation, TestType, start, BridgeID}, Config), + + %% Make start bridge succeed + + ok = meck:expect( + ?CONNECTOR_IMPL, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {ok, connector_state} end + ), + + %% try to start again + {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config), + %% delete the bridge {ok, 204, <<>>} = request(delete, uri([?ROOT, BridgeID]), Config), {ok, 200, []} = request_json(get, uri([?ROOT]), Config), From d14d0fbcb1ec856d283062bc0decf6add02c776b Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 3 Nov 2023 06:35:42 +0100 Subject: [PATCH 2/3] fix(bridge_v2 operations): better error message --- apps/emqx_bridge/src/emqx_bridge_api.erl | 4 +--- apps/emqx_bridge/src/emqx_bridge_v2.erl | 9 +++++++-- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 6 +----- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 11f2949c3..7640109a7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -1078,10 +1078,8 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> ?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>); {error, {unhealthy_target, Message}} -> ?BAD_REQUEST(Message); - {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> - ?BAD_REQUEST(redact(Reason)); {error, Reason} -> - ?BAD_REQUEST(Reason) + ?BAD_REQUEST(redact(Reason)) end. maybe_try_restart(all, start_bridges_to_all_nodes, Args) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 3498b690b..efa2c874b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -490,8 +490,13 @@ connector_operation_helper_with_conf( ok; {error, Reason} -> {error, Reason}; - NonConnectedStatus -> - {error, NonConnectedStatus} + #{status := Status, error := Reason} -> + Msg = io_lib:format( + "Connector started but bridge (~s:~s) is not connected. " + "Bridge Status: ~p, Error: ~p", + [bin(BridgeV2Type), bin(Name), Status, Reason] + ), + {error, iolist_to_binary(Msg)} end end. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 6f1ff272b..0e48fe43e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -606,12 +606,8 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> ?SERVICE_UNAVAILABLE(<<"Bridge not found on remote node: ", BridgeId/binary>>); {error, {node_not_found, Node}} -> ?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>); - {error, {unhealthy_target, Message}} -> - ?BAD_REQUEST(Message); - {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> - ?BAD_REQUEST(redact(Reason)); {error, Reason} -> - ?BAD_REQUEST(Reason) + ?BAD_REQUEST(redact(Reason)) end. do_bpapi_call(all, Call, Args) -> From ebb5997a8cd672a3a6fb8abbd4b57ff210127288 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 3 Nov 2023 11:11:27 +0100 Subject: [PATCH 3/3] test(emqx_bridge_v2_api_SUITE): fix cluster test --- .../test/emqx_bridge_v2_api_SUITE.erl | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 96eccf169..73d9728d7 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -147,6 +147,7 @@ emqx, emqx_auth, emqx_management, + emqx_connector, {emqx_bridge, "bridges_v2 {}"}, {emqx_rule_engine, "rule_engine { rules {} }"} ]). @@ -508,23 +509,29 @@ do_start_bridge(TestType, Config) -> {ok, 400, _} = request(post, {operation, TestType, invalidop, BridgeID}, Config), %% Make start bridge fail - ok = meck:expect( + expect_on_all_nodes( ?CONNECTOR_IMPL, on_add_channel, - fun(_, _, _ResId, _Channel) -> {error, <<"my_error">>} end + fun(_, _, _ResId, _Channel) -> {error, <<"my_error">>} end, + Config + ), + ConnectorID = emqx_connector_resource:connector_id(?BRIDGE_TYPE, ?CONNECTOR_NAME), + {ok, 204, <<>>} = emqx_connector_api_SUITE:request( + post, {operation, TestType, stop, ConnectorID}, Config + ), + {ok, 204, <<>>} = emqx_connector_api_SUITE:request( + post, {operation, TestType, start, ConnectorID}, Config ), - - ok = emqx_connector_resource:stop(?BRIDGE_TYPE, ?CONNECTOR_NAME), - ok = emqx_connector_resource:start(?BRIDGE_TYPE, ?CONNECTOR_NAME), {ok, 400, _} = request(post, {operation, TestType, start, BridgeID}, Config), %% Make start bridge succeed - ok = meck:expect( + expect_on_all_nodes( ?CONNECTOR_IMPL, on_add_channel, - fun(_, _, _ResId, _Channel) -> {ok, connector_state} end + fun(_, _, _ResId, _Channel) -> {ok, connector_state} end, + Config ), %% try to start again @@ -540,6 +547,15 @@ do_start_bridge(TestType, Config) -> {ok, 404, _} = request(post, {operation, TestType, start, <<"webhook:cptn_hook">>}, Config), ok. +expect_on_all_nodes(Mod, Function, Fun, Config) -> + case ?config(cluster_nodes, Config) of + undefined -> + ok = meck:expect(Mod, Function, Fun); + Nodes -> + [erpc:call(Node, meck, expect, [Mod, Function, Fun]) || Node <- Nodes] + end, + ok. + %% t_start_stop_inconsistent_bridge_node(Config) -> %% start_stop_inconsistent_bridge(node, Config).