fix(bridge_v2): start operation should return an error when unsuccessful

The bridge V2 HTTP API start operation should return a 400 error if the
start is unsuccessful.

The bridge V1 HTTP API compatibility layer for Bridge V2 should return a
400 error if the start or restart operation is unsuccessful.

This commit fixes the above and adds tests that checks this for the V2
HTTP API.

Fixes:
https://emqx.atlassian.net/browse/EMQX-11304
This commit is contained in:
Kjell Winblad 2023-11-02 18:28:22 +01:00
parent 73dd2f0ffd
commit 36640263ba
5 changed files with 115 additions and 15 deletions

View File

@ -1079,7 +1079,9 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
{error, {unhealthy_target, Message}} ->
?BAD_REQUEST(Message);
{error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
?BAD_REQUEST(redact(Reason))
?BAD_REQUEST(redact(Reason));
{error, Reason} ->
?BAD_REQUEST(Reason)
end.
maybe_try_restart(all, start_bridges_to_all_nodes, Args) ->

View File

@ -434,39 +434,66 @@ disable_enable(Action, BridgeType, BridgeName) when
%% Manually start connector. This function can speed up reconnection when
%% waiting for auto reconnection. The function forwards the start request to
%% its connector.
%% its connector. Returns ok if the status of the bridge is connected after
%% starting the connector. Returns {error, Reason} if the status of the bridge
%% is something else than connected after starting the connector or if an
%% error occurred when the connector was started.
-spec start(term(), term()) -> ok | {error, Reason :: term()}.
start(BridgeV2Type, Name) ->
ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
emqx_connector_resource:start(ConnectorType, ConnectorName)
end,
connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun).
connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, true).
connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun) ->
connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, DoHealthCheck) ->
connector_operation_helper_with_conf(
BridgeV2Type,
Name,
lookup_conf(BridgeV2Type, Name),
ConnectorOpFun
ConnectorOpFun,
DoHealthCheck
).
connector_operation_helper_with_conf(
_BridgeV2Type,
_Name,
{error, bridge_not_found} = Error,
_ConnectorOpFun
_ConnectorOpFun,
_DoHealthCheck
) ->
Error;
connector_operation_helper_with_conf(
_BridgeV2Type,
_Name,
#{enable := false},
_ConnectorOpFun
_ConnectorOpFun,
_DoHealthCheck
) ->
ok;
connector_operation_helper_with_conf(
BridgeV2Type,
Name,
#{connector := ConnectorName},
ConnectorOpFun
ConnectorOpFun,
DoHealthCheck
) ->
ConnectorType = connector_type(BridgeV2Type),
ConnectorOpFun(ConnectorType, ConnectorName).
ConnectorOpFunResult = ConnectorOpFun(ConnectorType, ConnectorName),
case {DoHealthCheck, ConnectorOpFunResult} of
{false, _} ->
ConnectorOpFunResult;
{true, {error, Reason}} ->
{error, Reason};
{true, ok} ->
case health_check(BridgeV2Type, Name) of
#{status := connected} ->
ok;
{error, Reason} ->
{error, Reason};
NonConnectedStatus ->
{error, NonConnectedStatus}
end
end.
reset_metrics(Type, Name) ->
reset_metrics_helper(Type, Name, lookup_conf(Type, Name)).
@ -513,6 +540,9 @@ do_send_msg_with_enabled_config(
BridgeV2Id = id(BridgeType, BridgeName),
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
-spec health_check(BridgeType :: term(), BridgeName :: term()) ->
#{status := term(), error := term()} | {error, Reason :: term()}.
health_check(BridgeType, BridgeName) ->
case lookup_conf(BridgeType, BridgeName) of
#{
@ -1365,28 +1395,30 @@ bridge_v1_restart(BridgeV1Type, Name) ->
ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
emqx_connector_resource:restart(ConnectorType, ConnectorName)
end,
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun).
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, true).
bridge_v1_stop(BridgeV1Type, Name) ->
ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
emqx_connector_resource:stop(ConnectorType, ConnectorName)
end,
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun).
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, false).
bridge_v1_start(BridgeV1Type, Name) ->
ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
emqx_connector_resource:start(ConnectorType, ConnectorName)
end,
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun).
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, true).
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun) ->
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
case emqx_bridge_v2:is_valid_bridge_v1(BridgeV1Type, Name) of
true ->
connector_operation_helper_with_conf(
BridgeV2Type,
Name,
lookup_conf(BridgeV2Type, Name),
ConnectorOpFun
ConnectorOpFun,
DoHealthCheck
);
false ->
{error, not_bridge_v1_compatible}

View File

@ -609,7 +609,9 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
{error, {unhealthy_target, Message}} ->
?BAD_REQUEST(Message);
{error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
?BAD_REQUEST(redact(Reason))
?BAD_REQUEST(redact(Reason));
{error, Reason} ->
?BAD_REQUEST(Reason)
end.
do_bpapi_call(all, Call, Args) ->

View File

@ -806,6 +806,44 @@ t_remove_multiple_connectors_being_referenced_without_channels(_Config) ->
),
ok.
t_start_operation_when_on_add_channel_gives_error(_Config) ->
Conf = bridge_config(),
BridgeName = my_test_bridge,
emqx_common_test_helpers:with_mock(
emqx_bridge_v2_test_connector,
on_add_channel,
fun(_, _, _ResId, _Channel) -> {error, <<"some_error">>} end,
fun() ->
%% We can crete the bridge event though on_add_channel returns error
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)),
?assertMatch(
#{
status := disconnected,
error := <<"some_error">>
},
emqx_bridge_v2:health_check(bridge_type(), BridgeName)
),
?assertMatch(
{ok, #{
status := disconnected,
error := <<"some_error">>
}},
emqx_bridge_v2:lookup(bridge_type(), BridgeName)
),
%% emqx_bridge_v2:start/2 should return ok if bridge if connected after
%% start and otherwise and error
?assertMatch({error, _}, emqx_bridge_v2:start(bridge_type(), BridgeName)),
%% Let us change on_add_channel to be successful and try again
ok = meck:expect(
emqx_bridge_v2_test_connector,
on_add_channel,
fun(_, _, _ResId, _Channel) -> {ok, #{}} end
),
?assertMatch(ok, emqx_bridge_v2:start(bridge_type(), BridgeName))
end
),
ok.
%% Helper Functions
wait_until(Fun) ->

View File

@ -280,6 +280,9 @@ init_mocks() ->
meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}),
meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected),
ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId)
end),
[?CONNECTOR_IMPL, emqx_connector_ee_schema].
clear_resources() ->
@ -504,6 +507,29 @@ do_start_bridge(TestType, Config) ->
{ok, 400, _} = request(post, {operation, TestType, invalidop, BridgeID}, Config),
%% Make start bridge fail
ok = meck:expect(
?CONNECTOR_IMPL,
on_add_channel,
fun(_, _, _ResId, _Channel) -> {error, <<"my_error">>} end
),
ok = emqx_connector_resource:stop(?BRIDGE_TYPE, ?CONNECTOR_NAME),
ok = emqx_connector_resource:start(?BRIDGE_TYPE, ?CONNECTOR_NAME),
{ok, 400, _} = request(post, {operation, TestType, start, BridgeID}, Config),
%% Make start bridge succeed
ok = meck:expect(
?CONNECTOR_IMPL,
on_add_channel,
fun(_, _, _ResId, _Channel) -> {ok, connector_state} end
),
%% try to start again
{ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri([?ROOT, BridgeID]), Config),
{ok, 200, []} = request_json(get, uri([?ROOT]), Config),