From 4e12a44ee6ab37795517969b871bcbb3db481e91 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 3 Feb 2023 15:19:36 +0100 Subject: [PATCH 1/7] fix: return 204 instead of 200 if there's no body content --- apps/emqx_bridge/src/emqx_bridge_api.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 2e94f0719..dce99646f 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -378,7 +378,7 @@ schema("/bridges/:id/metrics/reset") -> description => ?DESC("desc_api6"), parameters => [param_path_id()], responses => #{ - 200 => <<"Reset success">>, + 204 => <<"Reset success">>, 400 => error_schema(['BAD_REQUEST'], "RPC Call Failed") } } @@ -412,7 +412,7 @@ schema("/bridges/:id/:operation") -> param_path_operation_cluster() ], responses => #{ - 200 => <<"Operation success">>, + 204 => <<"Operation success">>, 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable"), 400 => error_schema('INVALID_ID', "Bad bridge ID") } @@ -431,7 +431,7 @@ schema("/nodes/:node/bridges/:id/:operation") -> param_path_operation_on_node() ], responses => #{ - 200 => <<"Operation success">>, + 204 => <<"Operation success">>, 400 => error_schema('INVALID_ID', "Bad bridge ID"), 403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"), 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") @@ -535,7 +535,7 @@ schema("/bridges_probe") -> emqx_bridge_resource:resource_id(BridgeType, BridgeName) ) of - ok -> {200, <<"Reset success">>}; + ok -> {204}; Reason -> {400, error_msg('BAD_REQUEST', Reason)} end ). @@ -660,7 +660,7 @@ operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> end, case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of {ok, _} -> - {200}; + {204}; {error, [timeout | _]} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, ErrL} -> @@ -905,7 +905,7 @@ call_operation(Node, OperFunc, BridgeType, BridgeName) -> ) of ok -> - {200}; + {204}; {error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, {start_pool_failed, Name, Reason}} -> From 86f3f5787fc8b5c84fe3f5c392083842ea44cdd8 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 3 Feb 2023 15:20:30 +0100 Subject: [PATCH 2/7] feat: allow to manually re-connect disconected bridge --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- apps/emqx_bridge/src/emqx_bridge_api.erl | 41 ++++-- apps/emqx_bridge/src/emqx_bridge_resource.erl | 6 +- .../src/proto/emqx_bridge_proto_v1.erl | 4 + .../src/proto/emqx_bridge_proto_v2.erl | 122 ++++++++++++++++++ .../test/emqx_bridge_api_SUITE.erl | 40 +++++- apps/emqx_resource/src/emqx_resource.app.src | 2 +- .../src/emqx_resource_manager.erl | 8 +- changes/v5.0.17/feat-9910.en.md | 1 + changes/v5.0.17/feat-9910.zh.md | 1 + 11 files changed, 207 insertions(+), 21 deletions(-) create mode 100644 apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl create mode 100644 changes/v5.0.17/feat-9910.en.md create mode 100644 changes/v5.0.17/feat-9910.zh.md diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 1a1bac140..769145722 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -3,6 +3,7 @@ {emqx_authn,1}. {emqx_authz,1}. {emqx_bridge,1}. +{emqx_bridge,2}. {emqx_broker,1}. {emqx_cm,1}. {emqx_conf,1}. diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index ca0001319..0d4b552ee 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.10"}, + {vsn, "0.1.11"}, {registered, []}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index dce99646f..8711535ce 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -413,8 +413,9 @@ schema("/bridges/:id/:operation") -> ], responses => #{ 204 => <<"Operation success">>, - 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable"), - 400 => error_schema('INVALID_ID', "Bad bridge ID") + 400 => error_schema('INVALID_ID', "Bad bridge ID"), + 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } } }; @@ -434,6 +435,7 @@ schema("/nodes/:node/bridges/:id/:operation") -> 204 => <<"Operation success">>, 400 => error_schema('INVALID_ID', "Bad bridge ID"), 403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"), + 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } } @@ -640,10 +642,12 @@ lookup_from_local_node(BridgeType, BridgeName) -> end ). +node_operation_func(<<"start">>) -> start_bridge_to_node; node_operation_func(<<"stop">>) -> stop_bridge_to_node; node_operation_func(<<"restart">>) -> restart_bridge_to_node; node_operation_func(_) -> invalid. +operation_func(<<"start">>) -> start; operation_func(<<"stop">>) -> stop; operation_func(<<"restart">>) -> restart; operation_func(_) -> invalid. @@ -656,11 +660,14 @@ operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> RpcFunc = case OperFunc of restart -> restart_bridges_to_all_nodes; + start -> start_bridges_to_all_nodes; stop -> stop_bridges_to_all_nodes end, - case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of + case is_ok(do_bpapi_call(RpcFunc, [Nodes, BridgeType, BridgeName])) of {ok, _} -> {204}; + {error, not_implemented} -> + {501}; {error, [timeout | _]} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, ErrL} -> @@ -858,6 +865,8 @@ unpack_bridge_conf(Type, PackedConf) -> #{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges), RawConf. +is_ok(Error = {error, _}) -> + Error; is_ok(ResL) -> case lists:filter( @@ -899,13 +908,11 @@ bin(S) when is_binary(S) -> call_operation(Node, OperFunc, BridgeType, BridgeName) -> case emqx_misc:safe_to_existing_atom(Node, utf8) of {ok, TargetNode} -> - case - emqx_bridge_proto_v1:OperFunc( - TargetNode, BridgeType, BridgeName - ) - of + case do_bpapi_call(TargetNode, OperFunc, [TargetNode, BridgeType, BridgeName]) of ok -> {204}; + {error, not_implemented} -> + {501}; {error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, {start_pool_failed, Name, Reason}} -> @@ -926,6 +933,24 @@ call_operation(Node, OperFunc, BridgeType, BridgeName) -> {400, error_msg('INVALID_NODE', <<"invalid node">>)} end. +do_bpapi_call(Call, Args) -> + do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args). + +do_bpapi_call(Node, Call, Args) -> + do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args). + +do_bpapi_call_vsn(SupportedVersion, Call, Args) -> + case lists:member(SupportedVersion, supported_versions(Call)) of + true -> + apply(emqx_bridge_proto_v2, Call, Args); + false -> + {error, not_implemented} + end. + +supported_versions(start_bridge_to_node) -> [2]; +supported_versions(start_bridges_to_all_nodes) -> [2]; +supported_versions(_Call) -> [1, 2]. + redact(Term) -> emqx_misc:redact(Term). diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index d228f2281..6cfe40bc3 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_bridge_resource). --include_lib("emqx/include/emqx.hrl"). + -include_lib("emqx/include/logger.hrl"). -export([ @@ -38,6 +38,7 @@ update/2, update/3, update/4, + start/2, stop/2, restart/2, reset_metrics/1 @@ -121,6 +122,9 @@ reset_metrics(ResourceId) -> stop(Type, Name) -> emqx_resource:stop(resource_id(Type, Name)). +start(Type, Name) -> + emqx_resource:start(resource_id(Type, Name)). + %% we don't provide 'start', as we want an already started bridge to be restarted. restart(Type, Name) -> emqx_resource:restart(resource_id(Type, Name)). diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl index 52790ca42..88554893b 100644 --- a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, list_bridges/1, restart_bridge_to_node/3, @@ -36,6 +37,9 @@ introduced_in() -> "5.0.0". +deprecated_since() -> + "5.0.17". + -spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). list_bridges(Node) -> rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl new file mode 100644 index 000000000..0fd733380 --- /dev/null +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl @@ -0,0 +1,122 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + list_bridges/1, + restart_bridge_to_node/3, + start_bridge_to_node/3, + stop_bridge_to_node/3, + lookup_from_all_nodes/3, + restart_bridges_to_all_nodes/3, + start_bridges_to_all_nodes/3, + stop_bridges_to_all_nodes/3 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 15000). + +introduced_in() -> + "5.0.17". + +-spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). +list_bridges(Node) -> + rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). + +-type key() :: atom() | binary() | [byte()]. + +-spec restart_bridge_to_node(node(), key(), key()) -> + term(). +restart_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + restart, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec start_bridge_to_node(node(), key(), key()) -> + term(). +start_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec stop_bridge_to_node(node(), key(), key()) -> + term(). +stop_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + stop, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec restart_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + restart, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec start_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec stop_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + stop, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec lookup_from_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +lookup_from_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_api, + lookup_from_local_node, + [BridgeType, BridgeName], + ?TIMEOUT + ). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 5cb78d3ba..bf09cf7f3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -82,11 +82,19 @@ end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge]), ok. +init_per_testcase(t_bad_bpapi_vsn, Config) -> + meck:new(emqx_bpapi, [passthrough]), + meck:expect(emqx_bpapi, supported_version, 1, 1), + meck:expect(emqx_bpapi, supported_version, 2, 1), + init_per_testcase(commong, Config); init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2), [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config]. +end_per_testcase(t_bad_bpapi_vsn, Config) -> + meck:unload([emqx_bpapi]), + end_per_testcase(commong, Config); end_per_testcase(_, Config) -> Sock = ?config(sock, Config), Acceptor = ?config(acceptor, Config), @@ -440,6 +448,20 @@ t_start_stop_bridges_node(Config) -> t_start_stop_bridges_cluster(Config) -> do_start_stop_bridges(cluster, Config). +t_bad_bpapi_vsn(Config) -> + Port = ?config(port, Config), + URL1 = ?URL(Port, "abc"), + Name = <<"t_bad_bpapi_vsn">>, + {ok, 201, _Bridge} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + {ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>), + {ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>), + ok. + do_start_stop_bridges(Type, Config) -> %% assert we there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -463,21 +485,25 @@ do_start_stop_bridges(Type, Config) -> } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), %% stop it - {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)), %% start again - {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), + %% start a started bridge + {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>), + {ok, 200, Bridge3_1} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3_1)), %% restart an already started bridge - {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), %% stop it again - {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), %% restart a stopped bridge - {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)), %% delete the bridge @@ -536,7 +562,7 @@ t_enable_disable_bridges(Config) -> {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). t_reset_bridges(Config) -> - %% assert we there's no bridges at first + %% assert there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), Name = ?BRIDGE_NAME, @@ -557,7 +583,7 @@ t_reset_bridges(Config) -> <<"url">> := URL1 } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), - {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), + {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 6bd9fd213..4618e94a6 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index db4441d88..4846c8ae6 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -17,7 +17,6 @@ -behaviour(gen_statem). -include("emqx_resource.hrl"). --include("emqx_resource_utils.hrl"). -include_lib("emqx/include/logger.hrl"). % API @@ -327,8 +326,11 @@ handle_event({call, From}, set_resource_status_connecting, _State, Data) -> handle_event({call, From}, restart, _State, Data) -> _ = stop_resource(Data), start_resource(Data, From); -% Called when the resource is to be started -handle_event({call, From}, start, stopped, Data) -> +% Called when the resource is to be started (also used for manual reconnect) +handle_event({call, From}, start, State, Data) when + State =:= stopped orelse + State =:= disconnected +-> start_resource(Data, From); handle_event({call, From}, start, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; diff --git a/changes/v5.0.17/feat-9910.en.md b/changes/v5.0.17/feat-9910.en.md new file mode 100644 index 000000000..bf8e0e1b4 --- /dev/null +++ b/changes/v5.0.17/feat-9910.en.md @@ -0,0 +1 @@ +Add `start` operation to bridges API to allow manual reconnect after failure. diff --git a/changes/v5.0.17/feat-9910.zh.md b/changes/v5.0.17/feat-9910.zh.md new file mode 100644 index 000000000..f165a44b9 --- /dev/null +++ b/changes/v5.0.17/feat-9910.zh.md @@ -0,0 +1 @@ +在桥梁 API 中增加 `start` "操作,允许失败后手动重新连接。 From c7535fce566af24b10edc5a72dde2cf2d9fb5ccd Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 8 Feb 2023 09:14:35 +0100 Subject: [PATCH 3/7] test: fix test expecting 200 instead of 204 --- .../test/emqx_bridge_impl_kafka_producer_SUITE.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 10a6d8d9a..17484b948 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -346,9 +346,9 @@ kafka_bridge_rest_api_helper(Config) -> {ok, 204, _} = show(http_put(show(BridgesPartsOpDisable), #{})), {ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})), {ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})), - {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), - {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), - {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})), + {ok, 204, _} = show(http_post(show(BridgesPartsOpStop), #{})), + {ok, 204, _} = show(http_post(show(BridgesPartsOpStop), #{})), + {ok, 204, _} = show(http_post(show(BridgesPartsOpRestart), #{})), %% Cleanup {ok, 204, _} = show(http_delete(BridgesPartsIdDeleteAlsoActions)), false = MyKafkaBridgeExists(), From c407ee3c3b10074442b8e3fc763a627bf50d5ce3 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 8 Feb 2023 09:19:20 +0100 Subject: [PATCH 4/7] style: fix zn changelog --- changes/v5.0.17/feat-9910.zh.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/v5.0.17/feat-9910.zh.md b/changes/v5.0.17/feat-9910.zh.md index f165a44b9..3e35fff19 100644 --- a/changes/v5.0.17/feat-9910.zh.md +++ b/changes/v5.0.17/feat-9910.zh.md @@ -1 +1 @@ -在桥梁 API 中增加 `start` "操作,允许失败后手动重新连接。 +在桥接 API 中增加 `start` 操作,允许失败后手动重新连接。 From 502f62e18dc17597ecae175ceac4a6d9d7572179 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 8 Feb 2023 13:19:41 +0100 Subject: [PATCH 5/7] fix: try old 'restart' behavior if 'start' is not implemented --- apps/emqx_bridge/src/emqx_bridge_api.erl | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 8711535ce..ebb529904 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -642,14 +642,14 @@ lookup_from_local_node(BridgeType, BridgeName) -> end ). +node_operation_func(<<"restart">>) -> restart_bridge_to_node; node_operation_func(<<"start">>) -> start_bridge_to_node; node_operation_func(<<"stop">>) -> stop_bridge_to_node; -node_operation_func(<<"restart">>) -> restart_bridge_to_node; node_operation_func(_) -> invalid. +operation_func(<<"restart">>) -> restart; operation_func(<<"start">>) -> start; operation_func(<<"stop">>) -> stop; -operation_func(<<"restart">>) -> restart; operation_func(_) -> invalid. enable_func(<<"true">>) -> enable; @@ -667,13 +667,20 @@ operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> {ok, _} -> {204}; {error, not_implemented} -> - {501}; + %% As of now this can only happen when we call 'start' on nodes + %% that run on an older proto version. + maybe_try_restart(Nodes, OperFunc, BridgeType, BridgeName); {error, [timeout | _]} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, ErrL} -> {500, error_msg('INTERNAL_ERROR', ErrL)} end. +maybe_try_restart(Nodes, start, BridgeType, BridgeName) -> + operation_to_all_nodes(Nodes, restart, BridgeType, BridgeName); +maybe_try_restart(_, _, _, _) -> + {501}. + ensure_bridge_created(BridgeType, BridgeName, Conf) -> case emqx_bridge:create(BridgeType, BridgeName, Conf) of {ok, _} -> ok; @@ -912,7 +919,9 @@ call_operation(Node, OperFunc, BridgeType, BridgeName) -> ok -> {204}; {error, not_implemented} -> - {501}; + %% Should only happen if we call `start` on a node that is + %% still on an older bpapi version that doesn't support it. + maybe_try_restart_node(Node, OperFunc, BridgeType, BridgeName); {error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, {start_pool_failed, Name, Reason}} -> @@ -933,6 +942,11 @@ call_operation(Node, OperFunc, BridgeType, BridgeName) -> {400, error_msg('INVALID_NODE', <<"invalid node">>)} end. +maybe_try_restart_node(Node, start_bridge_to_node, BridgeType, BridgeName) -> + call_operation(Node, restart_bridge_to_node, BridgeType, BridgeName); +maybe_try_restart_node(_, _, _, _) -> + {501}. + do_bpapi_call(Call, Args) -> do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args). From a3fd0897bc23ea1ef06b9d904ffc657f250fd38e Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 8 Feb 2023 13:39:45 +0100 Subject: [PATCH 6/7] refactor: less code duplication --- apps/emqx_bridge/src/emqx_bridge_api.erl | 116 ++++++++---------- .../test/emqx_bridge_api_SUITE.erl | 50 ++++++-- 2 files changed, 89 insertions(+), 77 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index ebb529904..04760f021 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -609,12 +609,12 @@ lookup_from_local_node(BridgeType, BridgeName) -> }) -> ?TRY_PARSE_ID( Id, - case operation_func(Op) of + case operation_to_all_func(Op) of invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; OperFunc -> Nodes = mria_mnesia:running_nodes(), - operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) + call_operation(all, OperFunc, [Nodes, BridgeType, BridgeName]) end ). @@ -637,7 +637,14 @@ lookup_from_local_node(BridgeType, BridgeName) -> <<"forbidden operation: bridge disabled">> )}; true -> - call_operation(Node, OperFunc, BridgeType, BridgeName) + case emqx_misc:safe_to_existing_atom(Node, utf8) of + {ok, TargetNode} -> + call_operation(TargetNode, OperFunc, [ + TargetNode, BridgeType, BridgeName + ]); + {error, _} -> + {400, error_msg('INVALID_NODE', <<"invalid node">>)} + end end end ). @@ -647,40 +654,15 @@ node_operation_func(<<"start">>) -> start_bridge_to_node; node_operation_func(<<"stop">>) -> stop_bridge_to_node; node_operation_func(_) -> invalid. -operation_func(<<"restart">>) -> restart; -operation_func(<<"start">>) -> start; -operation_func(<<"stop">>) -> stop; -operation_func(_) -> invalid. +operation_to_all_func(<<"restart">>) -> restart_bridges_to_all_nodes; +operation_to_all_func(<<"start">>) -> start_bridges_to_all_nodes; +operation_to_all_func(<<"stop">>) -> stop_bridges_to_all_nodes; +operation_to_all_func(_) -> invalid. enable_func(<<"true">>) -> enable; enable_func(<<"false">>) -> disable; enable_func(_) -> invalid. -operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> - RpcFunc = - case OperFunc of - restart -> restart_bridges_to_all_nodes; - start -> start_bridges_to_all_nodes; - stop -> stop_bridges_to_all_nodes - end, - case is_ok(do_bpapi_call(RpcFunc, [Nodes, BridgeType, BridgeName])) of - {ok, _} -> - {204}; - {error, not_implemented} -> - %% As of now this can only happen when we call 'start' on nodes - %% that run on an older proto version. - maybe_try_restart(Nodes, OperFunc, BridgeType, BridgeName); - {error, [timeout | _]} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; - {error, ErrL} -> - {500, error_msg('INTERNAL_ERROR', ErrL)} - end. - -maybe_try_restart(Nodes, start, BridgeType, BridgeName) -> - operation_to_all_nodes(Nodes, restart, BridgeType, BridgeName); -maybe_try_restart(_, _, _, _) -> - {501}. - ensure_bridge_created(BridgeType, BridgeName, Conf) -> case emqx_bridge:create(BridgeType, BridgeName, Conf) of {ok, _} -> ok; @@ -872,6 +854,10 @@ unpack_bridge_conf(Type, PackedConf) -> #{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges), RawConf. +is_ok(ok) -> + ok; +is_ok({ok, _} = OkResult) -> + OkResult; is_ok(Error = {error, _}) -> Error; is_ok(ResL) -> @@ -912,44 +898,42 @@ bin(S) when is_atom(S) -> bin(S) when is_binary(S) -> S. -call_operation(Node, OperFunc, BridgeType, BridgeName) -> - case emqx_misc:safe_to_existing_atom(Node, utf8) of - {ok, TargetNode} -> - case do_bpapi_call(TargetNode, OperFunc, [TargetNode, BridgeType, BridgeName]) of - ok -> - {204}; - {error, not_implemented} -> - %% Should only happen if we call `start` on a node that is - %% still on an older bpapi version that doesn't support it. - maybe_try_restart_node(Node, OperFunc, BridgeType, BridgeName); - {error, timeout} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; - {error, {start_pool_failed, Name, Reason}} -> - {503, - error_msg( - 'SERVICE_UNAVAILABLE', - bin( - io_lib:format( - "failed to start ~p pool for reason ~p", - [Name, Reason] - ) - ) - )}; - {error, Reason} -> - {500, error_msg('INTERNAL_ERROR', Reason)} - end; - {error, _} -> - {400, error_msg('INVALID_NODE', <<"invalid node">>)} +call_operation(NodeOrAll, OperFunc, Args) -> + case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of + ok -> + {204}; + {ok, _} -> + {204}; + {error, not_implemented} -> + %% Should only happen if we call `start` on a node that is + %% still on an older bpapi version that doesn't support it. + maybe_try_restart(NodeOrAll, OperFunc, Args); + {error, timeout} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, {start_pool_failed, Name, Reason}} -> + {503, + error_msg( + 'SERVICE_UNAVAILABLE', + bin( + io_lib:format( + "failed to start ~p pool for reason ~p", + [Name, Reason] + ) + ) + )}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} end. -maybe_try_restart_node(Node, start_bridge_to_node, BridgeType, BridgeName) -> - call_operation(Node, restart_bridge_to_node, BridgeType, BridgeName); -maybe_try_restart_node(_, _, _, _) -> +maybe_try_restart(all, start_bridges_to_all_nodes, Args) -> + call_operation(all, restart_bridges_to_all_nodes, Args); +maybe_try_restart(Node, start_bridge_to_node, Args) -> + call_operation(Node, restart_bridge_to_node, Args); +maybe_try_restart(_, _, _) -> {501}. -do_bpapi_call(Call, Args) -> - do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args). - +do_bpapi_call(all, Call, Args) -> + do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args); do_bpapi_call(Node, Call, Args) -> do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index bf09cf7f3..d6e8708ff 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -82,19 +82,27 @@ end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge]), ok. -init_per_testcase(t_bad_bpapi_vsn, Config) -> +init_per_testcase(t_broken_bpapi_vsn, Config) -> + meck:new(emqx_bpapi, [passthrough]), + meck:expect(emqx_bpapi, supported_version, 1, -1), + meck:expect(emqx_bpapi, supported_version, 2, -1), + init_per_testcase(commong, Config); +init_per_testcase(t_old_bpapi_vsn, Config) -> meck:new(emqx_bpapi, [passthrough]), meck:expect(emqx_bpapi, supported_version, 1, 1), meck:expect(emqx_bpapi, supported_version, 2, 1), - init_per_testcase(commong, Config); + init_per_testcase(common, Config); init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2), [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config]. -end_per_testcase(t_bad_bpapi_vsn, Config) -> +end_per_testcase(t_broken_bpapi_vsn, Config) -> meck:unload([emqx_bpapi]), - end_per_testcase(commong, Config); + end_per_testcase(common, Config); +end_per_testcase(t_old_bpapi_vsn, Config) -> + meck:unload([emqx_bpapi]), + end_per_testcase(common, Config); end_per_testcase(_, Config) -> Sock = ?config(sock, Config), Acceptor = ?config(acceptor, Config), @@ -442,13 +450,7 @@ t_cascade_delete_actions(Config) -> {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), ok. -t_start_stop_bridges_node(Config) -> - do_start_stop_bridges(node, Config). - -t_start_stop_bridges_cluster(Config) -> - do_start_stop_bridges(cluster, Config). - -t_bad_bpapi_vsn(Config) -> +t_broken_bpapi_vsn(Config) -> Port = ?config(port, Config), URL1 = ?URL(Port, "abc"), Name = <<"t_bad_bpapi_vsn">>, @@ -458,10 +460,36 @@ t_bad_bpapi_vsn(Config) -> ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) ), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + %% still works since we redirect to 'restart' {ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>), {ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>), ok. +t_old_bpapi_vsn(Config) -> + Port = ?config(port, Config), + URL1 = ?URL(Port, "abc"), + Name = <<"t_bad_bpapi_vsn">>, + {ok, 201, _Bridge} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + {ok, 204, <<>>} = request(post, operation_path(cluster, stop, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(node, stop, BridgeID), <<"">>), + %% still works since we redirect to 'restart' + {ok, 204, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(cluster, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(node, restart, BridgeID), <<"">>), + ok. + +t_start_stop_bridges_node(Config) -> + do_start_stop_bridges(node, Config). + +t_start_stop_bridges_cluster(Config) -> + do_start_stop_bridges(cluster, Config). + do_start_stop_bridges(Type, Config) -> %% assert we there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), From f89ac54b1789ceab3b7a7fd44b6483c783902e6a Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 8 Feb 2023 13:40:17 +0100 Subject: [PATCH 7/7] style: remove stale comment --- apps/emqx_bridge/src/emqx_bridge_resource.erl | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 6cfe40bc3..d2ce7a9d5 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -29,19 +29,19 @@ create/2, create/3, create/4, + create_dry_run/2, recreate/2, recreate/3, - create_dry_run/2, remove/1, remove/2, remove/4, - update/2, - update/3, - update/4, + reset_metrics/1, + restart/2, start/2, stop/2, - restart/2, - reset_metrics/1 + update/2, + update/3, + update/4 ]). %% bi-directional bridge with producer/consumer or ingress/egress configs @@ -119,16 +119,15 @@ to_type_atom(Type) -> reset_metrics(ResourceId) -> emqx_resource:reset_metrics(ResourceId). +restart(Type, Name) -> + emqx_resource:restart(resource_id(Type, Name)). + stop(Type, Name) -> emqx_resource:stop(resource_id(Type, Name)). start(Type, Name) -> emqx_resource:start(resource_id(Type, Name)). -%% we don't provide 'start', as we want an already started bridge to be restarted. -restart(Type, Name) -> - emqx_resource:restart(resource_id(Type, Name)). - create(BridgeId, Conf) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), create(BridgeType, BridgeName, Conf).