diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 1a1bac140..769145722 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -3,6 +3,7 @@ {emqx_authn,1}. {emqx_authz,1}. {emqx_bridge,1}. +{emqx_bridge,2}. {emqx_broker,1}. {emqx_cm,1}. {emqx_conf,1}. diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index ca0001319..0d4b552ee 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.10"}, + {vsn, "0.1.11"}, {registered, []}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 2e94f0719..04760f021 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -378,7 +378,7 @@ schema("/bridges/:id/metrics/reset") -> description => ?DESC("desc_api6"), parameters => [param_path_id()], responses => #{ - 200 => <<"Reset success">>, + 204 => <<"Reset success">>, 400 => error_schema(['BAD_REQUEST'], "RPC Call Failed") } } @@ -412,9 +412,10 @@ schema("/bridges/:id/:operation") -> param_path_operation_cluster() ], responses => #{ - 200 => <<"Operation success">>, - 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable"), - 400 => error_schema('INVALID_ID', "Bad bridge ID") + 204 => <<"Operation success">>, + 400 => error_schema('INVALID_ID', "Bad bridge ID"), + 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } } }; @@ -431,9 +432,10 @@ schema("/nodes/:node/bridges/:id/:operation") -> param_path_operation_on_node() ], responses => #{ - 200 => <<"Operation success">>, + 204 => <<"Operation success">>, 400 => error_schema('INVALID_ID', "Bad bridge ID"), 403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"), + 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } } @@ -535,7 +537,7 @@ schema("/bridges_probe") -> emqx_bridge_resource:resource_id(BridgeType, BridgeName) ) of - ok -> {200, <<"Reset success">>}; + ok -> {204}; Reason -> {400, error_msg('BAD_REQUEST', Reason)} end ). @@ -607,12 +609,12 @@ lookup_from_local_node(BridgeType, BridgeName) -> }) -> ?TRY_PARSE_ID( Id, - case operation_func(Op) of + case operation_to_all_func(Op) of invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; OperFunc -> Nodes = mria_mnesia:running_nodes(), - operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) + call_operation(all, OperFunc, [Nodes, BridgeType, BridgeName]) end ). @@ -635,38 +637,32 @@ lookup_from_local_node(BridgeType, BridgeName) -> <<"forbidden operation: bridge disabled">> )}; true -> - call_operation(Node, OperFunc, BridgeType, BridgeName) + case emqx_misc:safe_to_existing_atom(Node, utf8) of + {ok, TargetNode} -> + call_operation(TargetNode, OperFunc, [ + TargetNode, BridgeType, BridgeName + ]); + {error, _} -> + {400, error_msg('INVALID_NODE', <<"invalid node">>)} + end end end ). -node_operation_func(<<"stop">>) -> stop_bridge_to_node; node_operation_func(<<"restart">>) -> restart_bridge_to_node; +node_operation_func(<<"start">>) -> start_bridge_to_node; +node_operation_func(<<"stop">>) -> stop_bridge_to_node; node_operation_func(_) -> invalid. -operation_func(<<"stop">>) -> stop; -operation_func(<<"restart">>) -> restart; -operation_func(_) -> invalid. +operation_to_all_func(<<"restart">>) -> restart_bridges_to_all_nodes; +operation_to_all_func(<<"start">>) -> start_bridges_to_all_nodes; +operation_to_all_func(<<"stop">>) -> stop_bridges_to_all_nodes; +operation_to_all_func(_) -> invalid. enable_func(<<"true">>) -> enable; enable_func(<<"false">>) -> disable; enable_func(_) -> invalid. -operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> - RpcFunc = - case OperFunc of - restart -> restart_bridges_to_all_nodes; - stop -> stop_bridges_to_all_nodes - end, - case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of - {ok, _} -> - {200}; - {error, [timeout | _]} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; - {error, ErrL} -> - {500, error_msg('INTERNAL_ERROR', ErrL)} - end. - ensure_bridge_created(BridgeType, BridgeName, Conf) -> case emqx_bridge:create(BridgeType, BridgeName, Conf) of {ok, _} -> ok; @@ -858,6 +854,12 @@ unpack_bridge_conf(Type, PackedConf) -> #{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges), RawConf. +is_ok(ok) -> + ok; +is_ok({ok, _} = OkResult) -> + OkResult; +is_ok(Error = {error, _}) -> + Error; is_ok(ResL) -> case lists:filter( @@ -896,36 +898,57 @@ bin(S) when is_atom(S) -> bin(S) when is_binary(S) -> S. -call_operation(Node, OperFunc, BridgeType, BridgeName) -> - case emqx_misc:safe_to_existing_atom(Node, utf8) of - {ok, TargetNode} -> - case - emqx_bridge_proto_v1:OperFunc( - TargetNode, BridgeType, BridgeName - ) - of - ok -> - {200}; - {error, timeout} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; - {error, {start_pool_failed, Name, Reason}} -> - {503, - error_msg( - 'SERVICE_UNAVAILABLE', - bin( - io_lib:format( - "failed to start ~p pool for reason ~p", - [Name, Reason] - ) - ) - )}; - {error, Reason} -> - {500, error_msg('INTERNAL_ERROR', Reason)} - end; - {error, _} -> - {400, error_msg('INVALID_NODE', <<"invalid node">>)} +call_operation(NodeOrAll, OperFunc, Args) -> + case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of + ok -> + {204}; + {ok, _} -> + {204}; + {error, not_implemented} -> + %% Should only happen if we call `start` on a node that is + %% still on an older bpapi version that doesn't support it. + maybe_try_restart(NodeOrAll, OperFunc, Args); + {error, timeout} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, {start_pool_failed, Name, Reason}} -> + {503, + error_msg( + 'SERVICE_UNAVAILABLE', + bin( + io_lib:format( + "failed to start ~p pool for reason ~p", + [Name, Reason] + ) + ) + )}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} end. +maybe_try_restart(all, start_bridges_to_all_nodes, Args) -> + call_operation(all, restart_bridges_to_all_nodes, Args); +maybe_try_restart(Node, start_bridge_to_node, Args) -> + call_operation(Node, restart_bridge_to_node, Args); +maybe_try_restart(_, _, _) -> + {501}. + +do_bpapi_call(all, Call, Args) -> + do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args); +do_bpapi_call(Node, Call, Args) -> + do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args). + +do_bpapi_call_vsn(SupportedVersion, Call, Args) -> + case lists:member(SupportedVersion, supported_versions(Call)) of + true -> + apply(emqx_bridge_proto_v2, Call, Args); + false -> + {error, not_implemented} + end. + +supported_versions(start_bridge_to_node) -> [2]; +supported_versions(start_bridges_to_all_nodes) -> [2]; +supported_versions(_Call) -> [1, 2]. + redact(Term) -> emqx_misc:redact(Term). diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index d228f2281..d2ce7a9d5 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_bridge_resource). --include_lib("emqx/include/emqx.hrl"). + -include_lib("emqx/include/logger.hrl"). -export([ @@ -29,18 +29,19 @@ create/2, create/3, create/4, + create_dry_run/2, recreate/2, recreate/3, - create_dry_run/2, remove/1, remove/2, remove/4, + reset_metrics/1, + restart/2, + start/2, + stop/2, update/2, update/3, - update/4, - stop/2, - restart/2, - reset_metrics/1 + update/4 ]). %% bi-directional bridge with producer/consumer or ingress/egress configs @@ -118,12 +119,14 @@ to_type_atom(Type) -> reset_metrics(ResourceId) -> emqx_resource:reset_metrics(ResourceId). +restart(Type, Name) -> + emqx_resource:restart(resource_id(Type, Name)). + stop(Type, Name) -> emqx_resource:stop(resource_id(Type, Name)). -%% we don't provide 'start', as we want an already started bridge to be restarted. -restart(Type, Name) -> - emqx_resource:restart(resource_id(Type, Name)). +start(Type, Name) -> + emqx_resource:start(resource_id(Type, Name)). create(BridgeId, Conf) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl index 52790ca42..88554893b 100644 --- a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, list_bridges/1, restart_bridge_to_node/3, @@ -36,6 +37,9 @@ introduced_in() -> "5.0.0". +deprecated_since() -> + "5.0.17". + -spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). list_bridges(Node) -> rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl new file mode 100644 index 000000000..0fd733380 --- /dev/null +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl @@ -0,0 +1,122 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + list_bridges/1, + restart_bridge_to_node/3, + start_bridge_to_node/3, + stop_bridge_to_node/3, + lookup_from_all_nodes/3, + restart_bridges_to_all_nodes/3, + start_bridges_to_all_nodes/3, + stop_bridges_to_all_nodes/3 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 15000). + +introduced_in() -> + "5.0.17". + +-spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). +list_bridges(Node) -> + rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). + +-type key() :: atom() | binary() | [byte()]. + +-spec restart_bridge_to_node(node(), key(), key()) -> + term(). +restart_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + restart, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec start_bridge_to_node(node(), key(), key()) -> + term(). +start_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec stop_bridge_to_node(node(), key(), key()) -> + term(). +stop_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + stop, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec restart_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + restart, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec start_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec stop_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + stop, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec lookup_from_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +lookup_from_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_api, + lookup_from_local_node, + [BridgeType, BridgeName], + ?TIMEOUT + ). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 5cb78d3ba..d6e8708ff 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -82,11 +82,27 @@ end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge]), ok. +init_per_testcase(t_broken_bpapi_vsn, Config) -> + meck:new(emqx_bpapi, [passthrough]), + meck:expect(emqx_bpapi, supported_version, 1, -1), + meck:expect(emqx_bpapi, supported_version, 2, -1), + init_per_testcase(commong, Config); +init_per_testcase(t_old_bpapi_vsn, Config) -> + meck:new(emqx_bpapi, [passthrough]), + meck:expect(emqx_bpapi, supported_version, 1, 1), + meck:expect(emqx_bpapi, supported_version, 2, 1), + init_per_testcase(common, Config); init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2), [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config]. +end_per_testcase(t_broken_bpapi_vsn, Config) -> + meck:unload([emqx_bpapi]), + end_per_testcase(common, Config); +end_per_testcase(t_old_bpapi_vsn, Config) -> + meck:unload([emqx_bpapi]), + end_per_testcase(common, Config); end_per_testcase(_, Config) -> Sock = ?config(sock, Config), Acceptor = ?config(acceptor, Config), @@ -434,6 +450,40 @@ t_cascade_delete_actions(Config) -> {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), ok. +t_broken_bpapi_vsn(Config) -> + Port = ?config(port, Config), + URL1 = ?URL(Port, "abc"), + Name = <<"t_bad_bpapi_vsn">>, + {ok, 201, _Bridge} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + %% still works since we redirect to 'restart' + {ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>), + {ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>), + ok. + +t_old_bpapi_vsn(Config) -> + Port = ?config(port, Config), + URL1 = ?URL(Port, "abc"), + Name = <<"t_bad_bpapi_vsn">>, + {ok, 201, _Bridge} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + {ok, 204, <<>>} = request(post, operation_path(cluster, stop, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(node, stop, BridgeID), <<"">>), + %% still works since we redirect to 'restart' + {ok, 204, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(cluster, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(node, restart, BridgeID), <<"">>), + ok. + t_start_stop_bridges_node(Config) -> do_start_stop_bridges(node, Config). @@ -463,21 +513,25 @@ do_start_stop_bridges(Type, Config) -> } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), %% stop it - {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)), %% start again - {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), + %% start a started bridge + {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>), + {ok, 200, Bridge3_1} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3_1)), %% restart an already started bridge - {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), %% stop it again - {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), %% restart a stopped bridge - {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)), %% delete the bridge @@ -536,7 +590,7 @@ t_enable_disable_bridges(Config) -> {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). t_reset_bridges(Config) -> - %% assert we there's no bridges at first + %% assert there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), Name = ?BRIDGE_NAME, @@ -557,7 +611,7 @@ t_reset_bridges(Config) -> <<"url">> := URL1 } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), - {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), + {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index db4441d88..4846c8ae6 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -17,7 +17,6 @@ -behaviour(gen_statem). -include("emqx_resource.hrl"). --include("emqx_resource_utils.hrl"). -include_lib("emqx/include/logger.hrl"). % API @@ -327,8 +326,11 @@ handle_event({call, From}, set_resource_status_connecting, _State, Data) -> handle_event({call, From}, restart, _State, Data) -> _ = stop_resource(Data), start_resource(Data, From); -% Called when the resource is to be started -handle_event({call, From}, start, stopped, Data) -> +% Called when the resource is to be started (also used for manual reconnect) +handle_event({call, From}, start, State, Data) when + State =:= stopped orelse + State =:= disconnected +-> start_resource(Data, From); handle_event({call, From}, start, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; diff --git a/changes/v5.0.17/feat-9910.en.md b/changes/v5.0.17/feat-9910.en.md new file mode 100644 index 000000000..bf8e0e1b4 --- /dev/null +++ b/changes/v5.0.17/feat-9910.en.md @@ -0,0 +1 @@ +Add `start` operation to bridges API to allow manual reconnect after failure. diff --git a/changes/v5.0.17/feat-9910.zh.md b/changes/v5.0.17/feat-9910.zh.md new file mode 100644 index 000000000..3e35fff19 --- /dev/null +++ b/changes/v5.0.17/feat-9910.zh.md @@ -0,0 +1 @@ +在桥接 API 中增加 `start` 操作,允许失败后手动重新连接。 diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 10a6d8d9a..17484b948 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -346,9 +346,9 @@ kafka_bridge_rest_api_helper(Config) -> {ok, 204, _} = show(http_put(show(BridgesPartsOpDisable), #{})), {ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})), {ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})), - {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), - {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), - {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})), + {ok, 204, _} = show(http_post(show(BridgesPartsOpStop), #{})), + {ok, 204, _} = show(http_post(show(BridgesPartsOpStop), #{})), + {ok, 204, _} = show(http_post(show(BridgesPartsOpRestart), #{})), %% Cleanup {ok, 204, _} = show(http_delete(BridgesPartsIdDeleteAlsoActions)), false = MyKafkaBridgeExists(),