Merge pull request #9910 from sstrigler/EMQX-8861-improve-bridge-restart-button-behaviour

EMQX 8861 improve bridge restart button behaviour
This commit is contained in:
Zaiming (Stone) Shi 2023-02-09 18:00:48 +01:00 committed by GitHub
commit 42dfaf3ef2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 290 additions and 79 deletions

View File

@ -3,6 +3,7 @@
{emqx_authn,1}. {emqx_authn,1}.
{emqx_authz,1}. {emqx_authz,1}.
{emqx_bridge,1}. {emqx_bridge,1}.
{emqx_bridge,2}.
{emqx_broker,1}. {emqx_broker,1}.
{emqx_cm,1}. {emqx_cm,1}.
{emqx_conf,1}. {emqx_conf,1}.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge, [ {application, emqx_bridge, [
{description, "EMQX bridges"}, {description, "EMQX bridges"},
{vsn, "0.1.10"}, {vsn, "0.1.11"},
{registered, []}, {registered, []},
{mod, {emqx_bridge_app, []}}, {mod, {emqx_bridge_app, []}},
{applications, [ {applications, [

View File

@ -378,7 +378,7 @@ schema("/bridges/:id/metrics/reset") ->
description => ?DESC("desc_api6"), description => ?DESC("desc_api6"),
parameters => [param_path_id()], parameters => [param_path_id()],
responses => #{ responses => #{
200 => <<"Reset success">>, 204 => <<"Reset success">>,
400 => error_schema(['BAD_REQUEST'], "RPC Call Failed") 400 => error_schema(['BAD_REQUEST'], "RPC Call Failed")
} }
} }
@ -412,9 +412,10 @@ schema("/bridges/:id/:operation") ->
param_path_operation_cluster() param_path_operation_cluster()
], ],
responses => #{ responses => #{
200 => <<"Operation success">>, 204 => <<"Operation success">>,
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable"), 400 => error_schema('INVALID_ID', "Bad bridge ID"),
400 => error_schema('INVALID_ID', "Bad bridge ID") 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
} }
} }
}; };
@ -431,9 +432,10 @@ schema("/nodes/:node/bridges/:id/:operation") ->
param_path_operation_on_node() param_path_operation_on_node()
], ],
responses => #{ responses => #{
200 => <<"Operation success">>, 204 => <<"Operation success">>,
400 => error_schema('INVALID_ID', "Bad bridge ID"), 400 => error_schema('INVALID_ID', "Bad bridge ID"),
403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"), 403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"),
501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
} }
} }
@ -535,7 +537,7 @@ schema("/bridges_probe") ->
emqx_bridge_resource:resource_id(BridgeType, BridgeName) emqx_bridge_resource:resource_id(BridgeType, BridgeName)
) )
of of
ok -> {200, <<"Reset success">>}; ok -> {204};
Reason -> {400, error_msg('BAD_REQUEST', Reason)} Reason -> {400, error_msg('BAD_REQUEST', Reason)}
end end
). ).
@ -607,12 +609,12 @@ lookup_from_local_node(BridgeType, BridgeName) ->
}) -> }) ->
?TRY_PARSE_ID( ?TRY_PARSE_ID(
Id, Id,
case operation_func(Op) of case operation_to_all_func(Op) of
invalid -> invalid ->
{400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; {400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
OperFunc -> OperFunc ->
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) call_operation(all, OperFunc, [Nodes, BridgeType, BridgeName])
end end
). ).
@ -635,38 +637,32 @@ lookup_from_local_node(BridgeType, BridgeName) ->
<<"forbidden operation: bridge disabled">> <<"forbidden operation: bridge disabled">>
)}; )};
true -> true ->
call_operation(Node, OperFunc, BridgeType, BridgeName) case emqx_misc:safe_to_existing_atom(Node, utf8) of
{ok, TargetNode} ->
call_operation(TargetNode, OperFunc, [
TargetNode, BridgeType, BridgeName
]);
{error, _} ->
{400, error_msg('INVALID_NODE', <<"invalid node">>)}
end
end end
end end
). ).
node_operation_func(<<"stop">>) -> stop_bridge_to_node;
node_operation_func(<<"restart">>) -> restart_bridge_to_node; node_operation_func(<<"restart">>) -> restart_bridge_to_node;
node_operation_func(<<"start">>) -> start_bridge_to_node;
node_operation_func(<<"stop">>) -> stop_bridge_to_node;
node_operation_func(_) -> invalid. node_operation_func(_) -> invalid.
operation_func(<<"stop">>) -> stop; operation_to_all_func(<<"restart">>) -> restart_bridges_to_all_nodes;
operation_func(<<"restart">>) -> restart; operation_to_all_func(<<"start">>) -> start_bridges_to_all_nodes;
operation_func(_) -> invalid. operation_to_all_func(<<"stop">>) -> stop_bridges_to_all_nodes;
operation_to_all_func(_) -> invalid.
enable_func(<<"true">>) -> enable; enable_func(<<"true">>) -> enable;
enable_func(<<"false">>) -> disable; enable_func(<<"false">>) -> disable;
enable_func(_) -> invalid. enable_func(_) -> invalid.
operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) ->
RpcFunc =
case OperFunc of
restart -> restart_bridges_to_all_nodes;
stop -> stop_bridges_to_all_nodes
end,
case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of
{ok, _} ->
{200};
{error, [timeout | _]} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
{error, ErrL} ->
{500, error_msg('INTERNAL_ERROR', ErrL)}
end.
ensure_bridge_created(BridgeType, BridgeName, Conf) -> ensure_bridge_created(BridgeType, BridgeName, Conf) ->
case emqx_bridge:create(BridgeType, BridgeName, Conf) of case emqx_bridge:create(BridgeType, BridgeName, Conf) of
{ok, _} -> ok; {ok, _} -> ok;
@ -858,6 +854,12 @@ unpack_bridge_conf(Type, PackedConf) ->
#{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges), #{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges),
RawConf. RawConf.
is_ok(ok) ->
ok;
is_ok({ok, _} = OkResult) ->
OkResult;
is_ok(Error = {error, _}) ->
Error;
is_ok(ResL) -> is_ok(ResL) ->
case case
lists:filter( lists:filter(
@ -896,36 +898,57 @@ bin(S) when is_atom(S) ->
bin(S) when is_binary(S) -> bin(S) when is_binary(S) ->
S. S.
call_operation(Node, OperFunc, BridgeType, BridgeName) -> call_operation(NodeOrAll, OperFunc, Args) ->
case emqx_misc:safe_to_existing_atom(Node, utf8) of case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of
{ok, TargetNode} -> ok ->
case {204};
emqx_bridge_proto_v1:OperFunc( {ok, _} ->
TargetNode, BridgeType, BridgeName {204};
) {error, not_implemented} ->
of %% Should only happen if we call `start` on a node that is
ok -> %% still on an older bpapi version that doesn't support it.
{200}; maybe_try_restart(NodeOrAll, OperFunc, Args);
{error, timeout} -> {error, timeout} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
{error, {start_pool_failed, Name, Reason}} -> {error, {start_pool_failed, Name, Reason}} ->
{503, {503,
error_msg( error_msg(
'SERVICE_UNAVAILABLE', 'SERVICE_UNAVAILABLE',
bin( bin(
io_lib:format( io_lib:format(
"failed to start ~p pool for reason ~p", "failed to start ~p pool for reason ~p",
[Name, Reason] [Name, Reason]
) )
) )
)}; )};
{error, Reason} -> {error, Reason} ->
{500, error_msg('INTERNAL_ERROR', Reason)} {500, error_msg('INTERNAL_ERROR', Reason)}
end;
{error, _} ->
{400, error_msg('INVALID_NODE', <<"invalid node">>)}
end. end.
maybe_try_restart(all, start_bridges_to_all_nodes, Args) ->
call_operation(all, restart_bridges_to_all_nodes, Args);
maybe_try_restart(Node, start_bridge_to_node, Args) ->
call_operation(Node, restart_bridge_to_node, Args);
maybe_try_restart(_, _, _) ->
{501}.
do_bpapi_call(all, Call, Args) ->
do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args);
do_bpapi_call(Node, Call, Args) ->
do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args).
do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
case lists:member(SupportedVersion, supported_versions(Call)) of
true ->
apply(emqx_bridge_proto_v2, Call, Args);
false ->
{error, not_implemented}
end.
supported_versions(start_bridge_to_node) -> [2];
supported_versions(start_bridges_to_all_nodes) -> [2];
supported_versions(_Call) -> [1, 2].
redact(Term) -> redact(Term) ->
emqx_misc:redact(Term). emqx_misc:redact(Term).

View File

@ -14,7 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_bridge_resource). -module(emqx_bridge_resource).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-export([ -export([
@ -29,18 +29,19 @@
create/2, create/2,
create/3, create/3,
create/4, create/4,
create_dry_run/2,
recreate/2, recreate/2,
recreate/3, recreate/3,
create_dry_run/2,
remove/1, remove/1,
remove/2, remove/2,
remove/4, remove/4,
reset_metrics/1,
restart/2,
start/2,
stop/2,
update/2, update/2,
update/3, update/3,
update/4, update/4
stop/2,
restart/2,
reset_metrics/1
]). ]).
%% bi-directional bridge with producer/consumer or ingress/egress configs %% bi-directional bridge with producer/consumer or ingress/egress configs
@ -118,12 +119,14 @@ to_type_atom(Type) ->
reset_metrics(ResourceId) -> reset_metrics(ResourceId) ->
emqx_resource:reset_metrics(ResourceId). emqx_resource:reset_metrics(ResourceId).
restart(Type, Name) ->
emqx_resource:restart(resource_id(Type, Name)).
stop(Type, Name) -> stop(Type, Name) ->
emqx_resource:stop(resource_id(Type, Name)). emqx_resource:stop(resource_id(Type, Name)).
%% we don't provide 'start', as we want an already started bridge to be restarted. start(Type, Name) ->
restart(Type, Name) -> emqx_resource:start(resource_id(Type, Name)).
emqx_resource:restart(resource_id(Type, Name)).
create(BridgeId, Conf) -> create(BridgeId, Conf) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId), {BridgeType, BridgeName} = parse_bridge_id(BridgeId),

View File

@ -20,6 +20,7 @@
-export([ -export([
introduced_in/0, introduced_in/0,
deprecated_since/0,
list_bridges/1, list_bridges/1,
restart_bridge_to_node/3, restart_bridge_to_node/3,
@ -36,6 +37,9 @@
introduced_in() -> introduced_in() ->
"5.0.0". "5.0.0".
deprecated_since() ->
"5.0.17".
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). -spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
list_bridges(Node) -> list_bridges(Node) ->
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).

View File

@ -0,0 +1,122 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
list_bridges/1,
restart_bridge_to_node/3,
start_bridge_to_node/3,
stop_bridge_to_node/3,
lookup_from_all_nodes/3,
restart_bridges_to_all_nodes/3,
start_bridges_to_all_nodes/3,
stop_bridges_to_all_nodes/3
]).
-include_lib("emqx/include/bpapi.hrl").
-define(TIMEOUT, 15000).
introduced_in() ->
"5.0.17".
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
list_bridges(Node) ->
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).
-type key() :: atom() | binary() | [byte()].
-spec restart_bridge_to_node(node(), key(), key()) ->
term().
restart_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
restart,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec start_bridge_to_node(node(), key(), key()) ->
term().
start_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec stop_bridge_to_node(node(), key(), key()) ->
term().
stop_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
stop,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
restart,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
stop,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec lookup_from_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_api,
lookup_from_local_node,
[BridgeType, BridgeName],
?TIMEOUT
).

View File

@ -82,11 +82,27 @@ end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge]), emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge]),
ok. ok.
init_per_testcase(t_broken_bpapi_vsn, Config) ->
meck:new(emqx_bpapi, [passthrough]),
meck:expect(emqx_bpapi, supported_version, 1, -1),
meck:expect(emqx_bpapi, supported_version, 2, -1),
init_per_testcase(commong, Config);
init_per_testcase(t_old_bpapi_vsn, Config) ->
meck:new(emqx_bpapi, [passthrough]),
meck:expect(emqx_bpapi, supported_version, 1, 1),
meck:expect(emqx_bpapi, supported_version, 2, 1),
init_per_testcase(common, Config);
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
{Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2), {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2),
[{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config]. [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config].
end_per_testcase(t_broken_bpapi_vsn, Config) ->
meck:unload([emqx_bpapi]),
end_per_testcase(common, Config);
end_per_testcase(t_old_bpapi_vsn, Config) ->
meck:unload([emqx_bpapi]),
end_per_testcase(common, Config);
end_per_testcase(_, Config) -> end_per_testcase(_, Config) ->
Sock = ?config(sock, Config), Sock = ?config(sock, Config),
Acceptor = ?config(acceptor, Config), Acceptor = ?config(acceptor, Config),
@ -434,6 +450,40 @@ t_cascade_delete_actions(Config) ->
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
ok. ok.
t_broken_bpapi_vsn(Config) ->
Port = ?config(port, Config),
URL1 = ?URL(Port, "abc"),
Name = <<"t_bad_bpapi_vsn">>,
{ok, 201, _Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
%% still works since we redirect to 'restart'
{ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>),
{ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>),
ok.
t_old_bpapi_vsn(Config) ->
Port = ?config(port, Config),
URL1 = ?URL(Port, "abc"),
Name = <<"t_bad_bpapi_vsn">>,
{ok, 201, _Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
{ok, 204, <<>>} = request(post, operation_path(cluster, stop, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(node, stop, BridgeID), <<"">>),
%% still works since we redirect to 'restart'
{ok, 204, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(cluster, restart, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(node, restart, BridgeID), <<"">>),
ok.
t_start_stop_bridges_node(Config) -> t_start_stop_bridges_node(Config) ->
do_start_stop_bridges(node, Config). do_start_stop_bridges(node, Config).
@ -463,21 +513,25 @@ do_start_stop_bridges(Type, Config) ->
} = jsx:decode(Bridge), } = jsx:decode(Bridge),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
%% stop it %% stop it
{ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)), ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)),
%% start again %% start again
{ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
%% start a started bridge
{ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>),
{ok, 200, Bridge3_1} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3_1)),
%% restart an already started bridge %% restart an already started bridge
{ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
%% stop it again %% stop it again
{ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
%% restart a stopped bridge %% restart a stopped bridge
{ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)),
%% delete the bridge %% delete the bridge
@ -536,7 +590,7 @@ t_enable_disable_bridges(Config) ->
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
t_reset_bridges(Config) -> t_reset_bridges(Config) ->
%% assert we there's no bridges at first %% assert there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
Name = ?BRIDGE_NAME, Name = ?BRIDGE_NAME,
@ -557,7 +611,7 @@ t_reset_bridges(Config) ->
<<"url">> := URL1 <<"url">> := URL1
} = jsx:decode(Bridge), } = jsx:decode(Bridge),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
{ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),

View File

@ -17,7 +17,6 @@
-behaviour(gen_statem). -behaviour(gen_statem).
-include("emqx_resource.hrl"). -include("emqx_resource.hrl").
-include("emqx_resource_utils.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
% API % API
@ -327,8 +326,11 @@ handle_event({call, From}, set_resource_status_connecting, _State, Data) ->
handle_event({call, From}, restart, _State, Data) -> handle_event({call, From}, restart, _State, Data) ->
_ = stop_resource(Data), _ = stop_resource(Data),
start_resource(Data, From); start_resource(Data, From);
% Called when the resource is to be started % Called when the resource is to be started (also used for manual reconnect)
handle_event({call, From}, start, stopped, Data) -> handle_event({call, From}, start, State, Data) when
State =:= stopped orelse
State =:= disconnected
->
start_resource(Data, From); start_resource(Data, From);
handle_event({call, From}, start, _State, _Data) -> handle_event({call, From}, start, _State, _Data) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, [{reply, From, ok}]};

View File

@ -0,0 +1 @@
Add `start` operation to bridges API to allow manual reconnect after failure.

View File

@ -0,0 +1 @@
在桥接 API 中增加 `start` 操作,允许失败后手动重新连接。

View File

@ -346,9 +346,9 @@ kafka_bridge_rest_api_helper(Config) ->
{ok, 204, _} = show(http_put(show(BridgesPartsOpDisable), #{})), {ok, 204, _} = show(http_put(show(BridgesPartsOpDisable), #{})),
{ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})), {ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})),
{ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})), {ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})),
{ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), {ok, 204, _} = show(http_post(show(BridgesPartsOpStop), #{})),
{ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), {ok, 204, _} = show(http_post(show(BridgesPartsOpStop), #{})),
{ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})), {ok, 204, _} = show(http_post(show(BridgesPartsOpRestart), #{})),
%% Cleanup %% Cleanup
{ok, 204, _} = show(http_delete(BridgesPartsIdDeleteAlsoActions)), {ok, 204, _} = show(http_delete(BridgesPartsIdDeleteAlsoActions)),
false = MyKafkaBridgeExists(), false = MyKafkaBridgeExists(),