From 86f3f5787fc8b5c84fe3f5c392083842ea44cdd8 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 3 Feb 2023 15:20:30 +0100 Subject: [PATCH] feat: allow to manually re-connect disconected bridge --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- apps/emqx_bridge/src/emqx_bridge_api.erl | 41 ++++-- apps/emqx_bridge/src/emqx_bridge_resource.erl | 6 +- .../src/proto/emqx_bridge_proto_v1.erl | 4 + .../src/proto/emqx_bridge_proto_v2.erl | 122 ++++++++++++++++++ .../test/emqx_bridge_api_SUITE.erl | 40 +++++- apps/emqx_resource/src/emqx_resource.app.src | 2 +- .../src/emqx_resource_manager.erl | 8 +- changes/v5.0.17/feat-9910.en.md | 1 + changes/v5.0.17/feat-9910.zh.md | 1 + 11 files changed, 207 insertions(+), 21 deletions(-) create mode 100644 apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl create mode 100644 changes/v5.0.17/feat-9910.en.md create mode 100644 changes/v5.0.17/feat-9910.zh.md diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 1a1bac140..769145722 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -3,6 +3,7 @@ {emqx_authn,1}. {emqx_authz,1}. {emqx_bridge,1}. +{emqx_bridge,2}. {emqx_broker,1}. {emqx_cm,1}. {emqx_conf,1}. diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index ca0001319..0d4b552ee 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.10"}, + {vsn, "0.1.11"}, {registered, []}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index dce99646f..8711535ce 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -413,8 +413,9 @@ schema("/bridges/:id/:operation") -> ], responses => #{ 204 => <<"Operation success">>, - 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable"), - 400 => error_schema('INVALID_ID', "Bad bridge ID") + 400 => error_schema('INVALID_ID', "Bad bridge ID"), + 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } } }; @@ -434,6 +435,7 @@ schema("/nodes/:node/bridges/:id/:operation") -> 204 => <<"Operation success">>, 400 => error_schema('INVALID_ID', "Bad bridge ID"), 403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"), + 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } } @@ -640,10 +642,12 @@ lookup_from_local_node(BridgeType, BridgeName) -> end ). +node_operation_func(<<"start">>) -> start_bridge_to_node; node_operation_func(<<"stop">>) -> stop_bridge_to_node; node_operation_func(<<"restart">>) -> restart_bridge_to_node; node_operation_func(_) -> invalid. +operation_func(<<"start">>) -> start; operation_func(<<"stop">>) -> stop; operation_func(<<"restart">>) -> restart; operation_func(_) -> invalid. @@ -656,11 +660,14 @@ operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> RpcFunc = case OperFunc of restart -> restart_bridges_to_all_nodes; + start -> start_bridges_to_all_nodes; stop -> stop_bridges_to_all_nodes end, - case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of + case is_ok(do_bpapi_call(RpcFunc, [Nodes, BridgeType, BridgeName])) of {ok, _} -> {204}; + {error, not_implemented} -> + {501}; {error, [timeout | _]} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, ErrL} -> @@ -858,6 +865,8 @@ unpack_bridge_conf(Type, PackedConf) -> #{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges), RawConf. +is_ok(Error = {error, _}) -> + Error; is_ok(ResL) -> case lists:filter( @@ -899,13 +908,11 @@ bin(S) when is_binary(S) -> call_operation(Node, OperFunc, BridgeType, BridgeName) -> case emqx_misc:safe_to_existing_atom(Node, utf8) of {ok, TargetNode} -> - case - emqx_bridge_proto_v1:OperFunc( - TargetNode, BridgeType, BridgeName - ) - of + case do_bpapi_call(TargetNode, OperFunc, [TargetNode, BridgeType, BridgeName]) of ok -> {204}; + {error, not_implemented} -> + {501}; {error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, {start_pool_failed, Name, Reason}} -> @@ -926,6 +933,24 @@ call_operation(Node, OperFunc, BridgeType, BridgeName) -> {400, error_msg('INVALID_NODE', <<"invalid node">>)} end. +do_bpapi_call(Call, Args) -> + do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args). + +do_bpapi_call(Node, Call, Args) -> + do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args). + +do_bpapi_call_vsn(SupportedVersion, Call, Args) -> + case lists:member(SupportedVersion, supported_versions(Call)) of + true -> + apply(emqx_bridge_proto_v2, Call, Args); + false -> + {error, not_implemented} + end. + +supported_versions(start_bridge_to_node) -> [2]; +supported_versions(start_bridges_to_all_nodes) -> [2]; +supported_versions(_Call) -> [1, 2]. + redact(Term) -> emqx_misc:redact(Term). diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index d228f2281..6cfe40bc3 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_bridge_resource). --include_lib("emqx/include/emqx.hrl"). + -include_lib("emqx/include/logger.hrl"). -export([ @@ -38,6 +38,7 @@ update/2, update/3, update/4, + start/2, stop/2, restart/2, reset_metrics/1 @@ -121,6 +122,9 @@ reset_metrics(ResourceId) -> stop(Type, Name) -> emqx_resource:stop(resource_id(Type, Name)). +start(Type, Name) -> + emqx_resource:start(resource_id(Type, Name)). + %% we don't provide 'start', as we want an already started bridge to be restarted. restart(Type, Name) -> emqx_resource:restart(resource_id(Type, Name)). diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl index 52790ca42..88554893b 100644 --- a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, list_bridges/1, restart_bridge_to_node/3, @@ -36,6 +37,9 @@ introduced_in() -> "5.0.0". +deprecated_since() -> + "5.0.17". + -spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). list_bridges(Node) -> rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl new file mode 100644 index 000000000..0fd733380 --- /dev/null +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl @@ -0,0 +1,122 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + list_bridges/1, + restart_bridge_to_node/3, + start_bridge_to_node/3, + stop_bridge_to_node/3, + lookup_from_all_nodes/3, + restart_bridges_to_all_nodes/3, + start_bridges_to_all_nodes/3, + stop_bridges_to_all_nodes/3 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 15000). + +introduced_in() -> + "5.0.17". + +-spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). +list_bridges(Node) -> + rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). + +-type key() :: atom() | binary() | [byte()]. + +-spec restart_bridge_to_node(node(), key(), key()) -> + term(). +restart_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + restart, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec start_bridge_to_node(node(), key(), key()) -> + term(). +start_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec stop_bridge_to_node(node(), key(), key()) -> + term(). +stop_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + stop, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec restart_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + restart, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec start_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec stop_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + stop, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec lookup_from_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +lookup_from_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_api, + lookup_from_local_node, + [BridgeType, BridgeName], + ?TIMEOUT + ). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 5cb78d3ba..bf09cf7f3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -82,11 +82,19 @@ end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge]), ok. +init_per_testcase(t_bad_bpapi_vsn, Config) -> + meck:new(emqx_bpapi, [passthrough]), + meck:expect(emqx_bpapi, supported_version, 1, 1), + meck:expect(emqx_bpapi, supported_version, 2, 1), + init_per_testcase(commong, Config); init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2), [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config]. +end_per_testcase(t_bad_bpapi_vsn, Config) -> + meck:unload([emqx_bpapi]), + end_per_testcase(commong, Config); end_per_testcase(_, Config) -> Sock = ?config(sock, Config), Acceptor = ?config(acceptor, Config), @@ -440,6 +448,20 @@ t_start_stop_bridges_node(Config) -> t_start_stop_bridges_cluster(Config) -> do_start_stop_bridges(cluster, Config). +t_bad_bpapi_vsn(Config) -> + Port = ?config(port, Config), + URL1 = ?URL(Port, "abc"), + Name = <<"t_bad_bpapi_vsn">>, + {ok, 201, _Bridge} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + {ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>), + {ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>), + ok. + do_start_stop_bridges(Type, Config) -> %% assert we there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -463,21 +485,25 @@ do_start_stop_bridges(Type, Config) -> } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), %% stop it - {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)), %% start again - {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), + %% start a started bridge + {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>), + {ok, 200, Bridge3_1} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3_1)), %% restart an already started bridge - {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), %% stop it again - {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), %% restart a stopped bridge - {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)), %% delete the bridge @@ -536,7 +562,7 @@ t_enable_disable_bridges(Config) -> {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). t_reset_bridges(Config) -> - %% assert we there's no bridges at first + %% assert there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), Name = ?BRIDGE_NAME, @@ -557,7 +583,7 @@ t_reset_bridges(Config) -> <<"url">> := URL1 } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), - {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), + {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 6bd9fd213..4618e94a6 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index db4441d88..4846c8ae6 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -17,7 +17,6 @@ -behaviour(gen_statem). -include("emqx_resource.hrl"). --include("emqx_resource_utils.hrl"). -include_lib("emqx/include/logger.hrl"). % API @@ -327,8 +326,11 @@ handle_event({call, From}, set_resource_status_connecting, _State, Data) -> handle_event({call, From}, restart, _State, Data) -> _ = stop_resource(Data), start_resource(Data, From); -% Called when the resource is to be started -handle_event({call, From}, start, stopped, Data) -> +% Called when the resource is to be started (also used for manual reconnect) +handle_event({call, From}, start, State, Data) when + State =:= stopped orelse + State =:= disconnected +-> start_resource(Data, From); handle_event({call, From}, start, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; diff --git a/changes/v5.0.17/feat-9910.en.md b/changes/v5.0.17/feat-9910.en.md new file mode 100644 index 000000000..bf8e0e1b4 --- /dev/null +++ b/changes/v5.0.17/feat-9910.en.md @@ -0,0 +1 @@ +Add `start` operation to bridges API to allow manual reconnect after failure. diff --git a/changes/v5.0.17/feat-9910.zh.md b/changes/v5.0.17/feat-9910.zh.md new file mode 100644 index 000000000..f165a44b9 --- /dev/null +++ b/changes/v5.0.17/feat-9910.zh.md @@ -0,0 +1 @@ +在桥梁 API 中增加 `start` "操作,允许失败后手动重新连接。