From 6511693b2ee139216a93cf81f8a719602d691f40 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 21 Dec 2023 14:14:41 -0300 Subject: [PATCH] refactor(action_api): prepare for `/sources` HTTP API --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx_bridge/src/emqx_bridge_api.erl | 24 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 42 +++- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 216 ++++++++++++------ .../src/proto/emqx_bridge_proto_v6.erl | 196 ++++++++++++++++ .../test/emqx_bridge_api_SUITE.erl | 7 +- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 16 +- .../test/emqx_bridge_v2_testlib.erl | 18 +- 8 files changed, 404 insertions(+), 116 deletions(-) create mode 100644 apps/emqx_bridge/src/proto/emqx_bridge_proto_v6.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 9721a7f2f..9bd824242 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -8,6 +8,7 @@ {emqx_bridge,3}. {emqx_bridge,4}. {emqx_bridge,5}. +{emqx_bridge,6}. {emqx_broker,1}. {emqx_cm,1}. {emqx_cm,2}. diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 3168ae590..8f36fd700 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -49,6 +49,11 @@ -export([lookup_from_local_node/2]). -export([get_metrics_from_local_node/2]). +%% only for testting/mocking +-export([supported_versions/1]). + +-define(BPAPI_NAME, emqx_bridge). + -define(BRIDGE_NOT_ENABLED, ?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>) ). @@ -1102,18 +1107,18 @@ maybe_try_restart(_, _, _) -> do_bpapi_call(all, Call, Args) -> maybe_unwrap( - do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args) + do_bpapi_call_vsn(emqx_bpapi:supported_version(?BPAPI_NAME), Call, Args) ); do_bpapi_call(Node, Call, Args) -> case lists:member(Node, mria:running_nodes()) of true -> - do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args); + do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, ?BPAPI_NAME), Call, Args); false -> {error, {node_not_found, Node}} end. do_bpapi_call_vsn(SupportedVersion, Call, Args) -> - case lists:member(SupportedVersion, supported_versions(Call)) of + case lists:member(SupportedVersion, ?MODULE:supported_versions(Call)) of true -> apply(emqx_bridge_proto_v4, Call, Args); false -> @@ -1125,10 +1130,15 @@ maybe_unwrap({error, not_implemented}) -> maybe_unwrap(RpcMulticallResult) -> emqx_rpc:unwrap_erpc(RpcMulticallResult). -supported_versions(start_bridge_to_node) -> [2, 3, 4, 5]; -supported_versions(start_bridges_to_all_nodes) -> [2, 3, 4, 5]; -supported_versions(get_metrics_from_all_nodes) -> [4, 5]; -supported_versions(_Call) -> [1, 2, 3, 4, 5]. +supported_versions(start_bridge_to_node) -> bpapi_version_range(2, latest); +supported_versions(start_bridges_to_all_nodes) -> bpapi_version_range(2, latest); +supported_versions(get_metrics_from_all_nodes) -> bpapi_version_range(4, latest); +supported_versions(_Call) -> bpapi_version_range(1, latest). + +%% [From, To] (inclusive on both ends) +bpapi_version_range(From, latest) -> + ThisNodeVsn = emqx_bpapi:supported_version(node(), ?BPAPI_NAME), + lists:seq(From, ThisNodeVsn). redact(Term) -> emqx_utils:redact(Term). diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index c46013c0c..fb8ae2e2e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -43,6 +43,7 @@ lookup/2, lookup/3, create/3, + create/4, %% The remove/2 function is only for internal use as it may create %% rules with broken dependencies remove/2, @@ -50,7 +51,8 @@ %% The following is the remove function that is called by the HTTP API %% It also checks for rule action dependencies and optionally removes %% them - check_deps_and_remove/3 + check_deps_and_remove/3, + check_deps_and_remove/4 ]). %% Operations @@ -62,9 +64,11 @@ send_message/4, query/4, start/2, + start/3, reset_metrics/2, reset_metrics/3, create_dry_run/2, + create_dry_run/3, get_metrics/2, get_metrics/3 ]). @@ -150,6 +154,10 @@ -type bridge_v2_type() :: binary() | atom() | [byte()]. -type bridge_v2_name() :: binary() | atom() | [byte()]. +-type root_cfg_key() :: ?ROOT_KEY_ACTIONS | ?ROOT_KEY_SOURCES. + +-export_type([root_cfg_key/0]). + %%==================================================================== %%==================================================================== @@ -212,7 +220,7 @@ unload_bridges(ConfRooKey) -> lookup(Type, Name) -> lookup(?ROOT_KEY_ACTIONS, Type, Name). --spec lookup(sources | actions, bridge_v2_type(), bridge_v2_name()) -> +-spec lookup(root_cfg_key(), bridge_v2_type(), bridge_v2_name()) -> {ok, bridge_v2_info()} | {error, not_found}. lookup(ConfRootName, Type, Name) -> case emqx:get_raw_config([ConfRootName, Type, Name], not_found) of @@ -315,6 +323,11 @@ remove(ConfRootKey, BridgeType, BridgeName) -> -spec check_deps_and_remove(bridge_v2_type(), bridge_v2_name(), boolean()) -> ok | {error, any()}. check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) -> + check_deps_and_remove(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, AlsoDeleteActions). + +-spec check_deps_and_remove(root_cfg_key(), bridge_v2_type(), bridge_v2_name(), boolean()) -> + ok | {error, any()}. +check_deps_and_remove(ConfRooKey, BridgeType, BridgeName, AlsoDeleteActions) -> AlsoDelete = case AlsoDeleteActions of true -> [rule_actions]; @@ -328,7 +341,7 @@ check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) -> ) of ok -> - remove(BridgeType, BridgeName); + remove(ConfRooKey, BridgeType, BridgeName); {error, Reason} -> {error, Reason} end. @@ -539,11 +552,14 @@ disable_enable(ConfRootKey, Action, BridgeType, BridgeName) when ?ENABLE_OR_DISA %% is something else than connected after starting the connector or if an %% error occurred when the connector was started. -spec start(term(), term()) -> ok | {error, Reason :: term()}. -start(BridgeV2Type, Name) -> +start(ActionOrSourceType, Name) -> + start(?ROOT_KEY_ACTIONS, ActionOrSourceType, Name). + +-spec start(root_cfg_key(), term(), term()) -> ok | {error, Reason :: term()}. +start(ConfRootKey, BridgeV2Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:start(ConnectorType, ConnectorName) end, - ConfRootKey = ?ROOT_KEY_ACTIONS, connector_operation_helper(ConfRootKey, BridgeV2Type, Name, ConnectorOpFun, true). connector_operation_helper(ConfRootKey, BridgeV2Type, Name, ConnectorOpFun, DoHealthCheck) -> @@ -694,10 +710,15 @@ health_check(ConfRootKey, BridgeType, BridgeName) -> end. -spec create_dry_run(bridge_v2_type(), Config :: map()) -> ok | {error, term()}. -create_dry_run(Type, Conf0) -> +create_dry_run(Type, Conf) -> + create_dry_run(?ROOT_KEY_ACTIONS, Type, Conf). + +-spec create_dry_run(root_cfg_key(), bridge_v2_type(), Config :: map()) -> ok | {error, term()}. +create_dry_run(ConfRootKey, Type, Conf0) -> Conf1 = maps:without([<<"name">>], Conf0), TypeBin = bin(Type), - RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, + ConfRootKeyBin = bin(ConfRootKey), + RawConf = #{ConfRootKeyBin => #{TypeBin => #{<<"temp_name">> => Conf1}}}, %% Check config try _ = @@ -722,6 +743,9 @@ create_dry_run(Type, Conf0) -> end. create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> + create_dry_run_helper(?ROOT_KEY_ACTIONS, BridgeType, ConnectorRawConf, BridgeV2RawConf). + +create_dry_run_helper(ConfRootKey, BridgeType, ConnectorRawConf, BridgeV2RawConf) -> BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), ConnectorType = connector_type(BridgeType), OnReadyCallback = @@ -730,7 +754,7 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> ChannelTestId = id(BridgeType, BridgeName, ConnectorName), Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf), AugmentedConf = augment_channel_config( - ?ROOT_KEY_ACTIONS, + ConfRootKey, BridgeType, BridgeName, Conf @@ -756,6 +780,8 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> get_metrics(Type, Name) -> get_metrics(?ROOT_KEY_ACTIONS, Type, Name). +-spec get_metrics(root_cfg_key(), bridge_v2_type(), bridge_v2_name()) -> + emqx_metrics_worker:metrics(). get_metrics(ConfRootKey, Type, Name) -> emqx_resource:get_metrics(id_with_root_name(ConfRootKey, Type, Name)). diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 254390a36..3f0d18fae 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -26,6 +26,9 @@ -import(hoconsc, [mk/2, array/1, enum/1]). -import(emqx_utils, [redact/1]). +-define(ROOT_KEY_ACTIONS, actions). +-define(ROOT_KEY_SOURCES, sources). + %% Swagger specs from hocon schema -export([ api_spec/0, @@ -48,7 +51,14 @@ ]). %% BpAPI / RPC Targets --export([lookup_from_local_node/2, get_metrics_from_local_node/2]). +-export([ + lookup_from_local_node/2, + get_metrics_from_local_node/2, + lookup_from_local_node_v6/3, + get_metrics_from_local_node_v6/3 +]). + +-define(BPAPI_NAME, emqx_bridge). -define(BRIDGE_NOT_FOUND(BRIDGE_TYPE, BRIDGE_NAME), ?NOT_FOUND( @@ -393,16 +403,51 @@ schema("/action_types") -> }. '/actions'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) -> - case emqx_bridge_v2:lookup(BridgeType, BridgeName) of - {ok, _} -> - ?BAD_REQUEST('ALREADY_EXISTS', <<"bridge already exists">>); - {error, not_found} -> - Conf = filter_out_request_body(Conf0), - create_bridge(BridgeType, BridgeName, Conf) - end; + handle_create(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, Conf0); '/actions'(get, _Params) -> - Nodes = mria:running_nodes(), - NodeReplies = emqx_bridge_proto_v5:v2_list_bridges_on_nodes(Nodes), + handle_list(?ROOT_KEY_ACTIONS). + +'/actions/:id'(get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, lookup_from_all_nodes(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, 200)); +'/actions/:id'(put, #{bindings := #{id := Id}, body := Conf0}) -> + handle_update(?ROOT_KEY_ACTIONS, Id, Conf0); +'/actions/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) -> + handle_delete(?ROOT_KEY_ACTIONS, Id, Qs). + +'/actions/:id/metrics'(get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, get_metrics_from_all_nodes(?ROOT_KEY_ACTIONS, BridgeType, BridgeName)). + +'/actions/:id/metrics/reset'(put, #{bindings := #{id := Id}}) -> + handle_reset_metrics(?ROOT_KEY_ACTIONS, Id). + +'/actions/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> + handle_disable_enable(?ROOT_KEY_ACTIONS, Id, Enable). + +'/actions/:id/:operation'(post, #{ + bindings := + #{id := Id, operation := Op} +}) -> + handle_operation(?ROOT_KEY_ACTIONS, Id, Op). + +'/nodes/:node/actions/:id/:operation'(post, #{ + bindings := + #{id := Id, operation := Op, node := Node} +}) -> + handle_node_operation(?ROOT_KEY_ACTIONS, Node, Id, Op). + +'/actions_probe'(post, Request) -> + handle_probe(?ROOT_KEY_ACTIONS, Request). + +'/action_types'(get, _Request) -> + ?OK(emqx_bridge_v2_schema:types()). + +%%------------------------------------------------------------------------------ +%% Handlers +%%------------------------------------------------------------------------------ + +handle_list(ConfRootKey) -> + Nodes = emqx:running_nodes(), + NodeReplies = emqx_bridge_proto_v6:v2_list_bridges_on_nodes_v6(Nodes, ConfRootKey), case is_ok(NodeReplies) of {ok, NodeBridges} -> AllBridges = [ @@ -414,34 +459,44 @@ schema("/action_types") -> ?INTERNAL_ERROR(Reason) end. -'/actions/:id'(get, #{bindings := #{id := Id}}) -> - ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200)); -'/actions/:id'(put, #{bindings := #{id := Id}, body := Conf0}) -> +handle_create(ConfRootKey, Type, Name, Conf0) -> + case emqx_bridge_v2:lookup(ConfRootKey, Type, Name) of + {ok, _} -> + ?BAD_REQUEST('ALREADY_EXISTS', <<"bridge already exists">>); + {error, not_found} -> + Conf = filter_out_request_body(Conf0), + create_bridge(ConfRootKey, Type, Name, Conf) + end. + +handle_update(ConfRootKey, Id, Conf0) -> Conf1 = filter_out_request_body(Conf0), ?TRY_PARSE_ID( Id, - case emqx_bridge_v2:lookup(BridgeType, BridgeName) of + case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of {ok, _} -> RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}), Conf = emqx_utils:deobfuscate(Conf1, RawConf), - update_bridge(BridgeType, BridgeName, Conf); + update_bridge(ConfRootKey, BridgeType, BridgeName, Conf); {error, not_found} -> ?BRIDGE_NOT_FOUND(BridgeType, BridgeName) end - ); -'/actions/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) -> + ). + +handle_delete(ConfRootKey, Id, QueryStringOpts) -> ?TRY_PARSE_ID( Id, case emqx_bridge_v2:lookup(BridgeType, BridgeName) of {ok, _} -> AlsoDeleteActions = - case maps:get(<<"also_delete_dep_actions">>, Qs, <<"false">>) of + case maps:get(<<"also_delete_dep_actions">>, QueryStringOpts, <<"false">>) of <<"true">> -> true; true -> true; _ -> false end, case - emqx_bridge_v2:check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) + emqx_bridge_v2:check_deps_and_remove( + ConfRootKey, BridgeType, BridgeName, AlsoDeleteActions + ) of ok -> ?NO_CONTENT; @@ -465,23 +520,22 @@ schema("/action_types") -> end ). -'/actions/:id/metrics'(get, #{bindings := #{id := Id}}) -> - ?TRY_PARSE_ID(Id, get_metrics_from_all_nodes(BridgeType, BridgeName)). - -'/actions/:id/metrics/reset'(put, #{bindings := #{id := Id}}) -> +handle_reset_metrics(ConfRootKey, Id) -> ?TRY_PARSE_ID( Id, begin ActionType = emqx_bridge_v2:bridge_v2_type_to_connector_type(BridgeType), - ok = emqx_bridge_v2:reset_metrics(ActionType, BridgeName), + ok = emqx_bridge_v2:reset_metrics(ConfRootKey, ActionType, BridgeName), ?NO_CONTENT end ). -'/actions/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> +handle_disable_enable(ConfRootKey, Id, Enable) -> ?TRY_PARSE_ID( Id, - case emqx_bridge_v2:disable_enable(enable_func(Enable), BridgeType, BridgeName) of + case + emqx_bridge_v2:disable_enable(ConfRootKey, enable_func(Enable), BridgeType, BridgeName) + of {ok, _} -> ?NO_CONTENT; {error, {pre_config_update, _, bridge_not_found}} -> @@ -495,41 +549,37 @@ schema("/action_types") -> end ). -'/actions/:id/:operation'(post, #{ - bindings := - #{id := Id, operation := Op} -}) -> +handle_operation(ConfRootKey, Id, Op) -> ?TRY_PARSE_ID( Id, begin OperFunc = operation_func(all, Op), - Nodes = mria:running_nodes(), - call_operation_if_enabled(all, OperFunc, [Nodes, BridgeType, BridgeName]) + Nodes = emqx:running_nodes(), + call_operation_if_enabled(all, OperFunc, [Nodes, ConfRootKey, BridgeType, BridgeName]) end ). -'/nodes/:node/actions/:id/:operation'(post, #{ - bindings := - #{id := Id, operation := Op, node := Node} -}) -> +handle_node_operation(ConfRootKey, Node, Id, Op) -> ?TRY_PARSE_ID( Id, case emqx_utils:safe_to_existing_atom(Node, utf8) of {ok, TargetNode} -> OperFunc = operation_func(TargetNode, Op), - call_operation_if_enabled(TargetNode, OperFunc, [TargetNode, BridgeType, BridgeName]); + call_operation_if_enabled(TargetNode, OperFunc, [ + TargetNode, ConfRootKey, BridgeType, BridgeName + ]); {error, _} -> ?NOT_FOUND(<<"Invalid node name: ", Node/binary>>) end ). -'/actions_probe'(post, Request) -> +handle_probe(ConfRootKey, Request) -> RequestMeta = #{module => ?MODULE, method => post, path => "/actions_probe"}, case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of - {ok, #{body := #{<<"type">> := ConnType} = Params}} -> + {ok, #{body := #{<<"type">> := Type} = Params}} -> Params1 = maybe_deobfuscate_bridge_probe(Params), Params2 = maps:remove(<<"type">>, Params1), - case emqx_bridge_v2:create_dry_run(ConnType, Params2) of + case emqx_bridge_v2:create_dry_run(ConfRootKey, Type, Params2) of ok -> ?NO_CONTENT; {error, #{kind := validation_error} = Reason0} -> @@ -548,9 +598,7 @@ schema("/action_types") -> redact(BadRequest) end. -'/action_types'(get, _Request) -> - ?OK(emqx_bridge_v2_schema:types()). - +%%% API helpers maybe_deobfuscate_bridge_probe(#{<<"type">> := ActionType, <<"name">> := BridgeName} = Params) -> case emqx_bridge_v2:lookup(ActionType, BridgeName) of {ok, #{raw_config := RawConf}} -> @@ -564,7 +612,6 @@ maybe_deobfuscate_bridge_probe(#{<<"type">> := ActionType, <<"name">> := BridgeN maybe_deobfuscate_bridge_probe(Params) -> Params. -%%% API helpers is_ok(ok) -> ok; is_ok(OkResult = {ok, _}) -> @@ -587,9 +634,16 @@ is_ok(ResL) -> end. %% bridge helpers -lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> - Nodes = mria:running_nodes(), - case is_ok(emqx_bridge_proto_v5:v2_lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of +-spec lookup_from_all_nodes(emqx_bridge_v2:root_cfg_key(), _, _, _) -> _. +lookup_from_all_nodes(ConfRootKey, BridgeType, BridgeName, SuccCode) -> + Nodes = emqx:running_nodes(), + case + is_ok( + emqx_bridge_proto_v6:v2_lookup_from_all_nodes_v6( + Nodes, ConfRootKey, BridgeType, BridgeName + ) + ) + of {ok, [{ok, _} | _] = Results} -> {SuccCode, format_bridge_info([R || {ok, R} <- Results])}; {ok, [{error, not_found} | _]} -> @@ -598,10 +652,10 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> ?INTERNAL_ERROR(Reason) end. -get_metrics_from_all_nodes(ActionType, ActionName) -> +get_metrics_from_all_nodes(ConfRootKey, Type, Name) -> Nodes = emqx:running_nodes(), Result = maybe_unwrap( - emqx_bridge_proto_v5:v2_get_metrics_from_all_nodes(Nodes, ActionType, ActionName) + emqx_bridge_proto_v6:v2_get_metrics_from_all_nodes_v6(Nodes, ConfRootKey, Type, Name) ), case Result of Metrics when is_list(Metrics) -> @@ -610,22 +664,25 @@ get_metrics_from_all_nodes(ActionType, ActionName) -> ?INTERNAL_ERROR(Reason) end. -operation_func(all, start) -> v2_start_bridge_to_all_nodes; -operation_func(_Node, start) -> v2_start_bridge_to_node. +operation_func(all, start) -> v2_start_bridge_to_all_nodes_v6; +operation_func(_Node, start) -> v2_start_bridge_to_node_v6; +operation_func(all, lookup) -> v2_lookup_from_all_nodes_v6; +operation_func(all, list) -> v2_list_bridges_on_nodes_v6; +operation_func(all, get_metrics) -> v2_get_metrics_from_all_nodes_v6. -call_operation_if_enabled(NodeOrAll, OperFunc, [Nodes, BridgeType, BridgeName]) -> - try is_enabled_bridge(BridgeType, BridgeName) of +call_operation_if_enabled(NodeOrAll, OperFunc, [Nodes, ConfRootKey, BridgeType, BridgeName]) -> + try is_enabled_bridge(ConfRootKey, BridgeType, BridgeName) of false -> ?BRIDGE_NOT_ENABLED; true -> - call_operation(NodeOrAll, OperFunc, [Nodes, BridgeType, BridgeName]) + call_operation(NodeOrAll, OperFunc, [Nodes, ConfRootKey, BridgeType, BridgeName]) catch throw:not_found -> ?BRIDGE_NOT_FOUND(BridgeType, BridgeName) end. -is_enabled_bridge(BridgeType, BridgeName) -> - try emqx_bridge_v2:lookup(BridgeType, binary_to_existing_atom(BridgeName)) of +is_enabled_bridge(ConfRootKey, BridgeType, BridgeName) -> + try emqx_bridge_v2:lookup(ConfRootKey, BridgeType, binary_to_existing_atom(BridgeName)) of {ok, #{raw_config := ConfMap}} -> maps:get(<<"enable">>, ConfMap, false); {error, not_found} -> @@ -637,7 +694,7 @@ is_enabled_bridge(BridgeType, BridgeName) -> throw(not_found) end. -call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> +call_operation(NodeOrAll, OperFunc, Args = [_Nodes, _ConfRootKey, BridgeType, BridgeName]) -> case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok -> ?NO_CONTENT; @@ -668,12 +725,12 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> do_bpapi_call(all, Call, Args) -> maybe_unwrap( - do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args) + do_bpapi_call_vsn(emqx_bpapi:supported_version(?BPAPI_NAME), Call, Args) ); do_bpapi_call(Node, Call, Args) -> - case lists:member(Node, mria:running_nodes()) of + case lists:member(Node, emqx:running_nodes()) of true -> - do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args); + do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, ?BPAPI_NAME), Call, Args); false -> {error, {node_not_found, Node}} end. @@ -681,7 +738,7 @@ do_bpapi_call(Node, Call, Args) -> do_bpapi_call_vsn(Version, Call, Args) -> case is_supported_version(Version, Call) of true -> - apply(emqx_bridge_proto_v5, Call, Args); + apply(emqx_bridge_proto_v6, Call, Args); false -> {error, not_implemented} end. @@ -689,7 +746,12 @@ do_bpapi_call_vsn(Version, Call, Args) -> is_supported_version(Version, Call) -> lists:member(Version, supported_versions(Call)). -supported_versions(_Call) -> [5]. +supported_versions(_Call) -> bpapi_version_range(6, latest). + +%% [From, To] (inclusive on both ends) +bpapi_version_range(From, latest) -> + ThisNodeVsn = emqx_bpapi:supported_version(node(), ?BPAPI_NAME), + lists:seq(From, ThisNodeVsn). maybe_unwrap({error, not_implemented}) -> {error, not_implemented}; @@ -767,10 +829,22 @@ lookup_from_local_node(BridgeType, BridgeName) -> Error -> Error end. +%% RPC Target +-spec lookup_from_local_node_v6(emqx_bridge_v2:root_cfg_key(), _, _) -> _. +lookup_from_local_node_v6(ConfRootKey, BridgeType, BridgeName) -> + case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of + {ok, Res} -> {ok, format_resource(Res, node())}; + Error -> Error + end. + %% RPC Target get_metrics_from_local_node(ActionType, ActionName) -> format_metrics(emqx_bridge_v2:get_metrics(ActionType, ActionName)). +%% RPC Target +get_metrics_from_local_node_v6(ConfRootKey, Type, Name) -> + format_metrics(emqx_bridge_v2:get_metrics(ConfRootKey, Type, Name)). + %% resource format_resource( #{ @@ -938,13 +1012,13 @@ format_resource_data(error, Error, Result) -> format_resource_data(K, V, Result) -> Result#{K => V}. -create_bridge(BridgeType, BridgeName, Conf) -> - create_or_update_bridge(BridgeType, BridgeName, Conf, 201). +create_bridge(ConfRootKey, BridgeType, BridgeName, Conf) -> + create_or_update_bridge(ConfRootKey, BridgeType, BridgeName, Conf, 201). -update_bridge(BridgeType, BridgeName, Conf) -> - create_or_update_bridge(BridgeType, BridgeName, Conf, 200). +update_bridge(ConfRootKey, BridgeType, BridgeName, Conf) -> + create_or_update_bridge(ConfRootKey, BridgeType, BridgeName, Conf, 200). -create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) -> +create_or_update_bridge(ConfRootKey, BridgeType, BridgeName, Conf, HttpStatusCode) -> Check = try is_binary(BridgeType) andalso emqx_resource:validate_type(BridgeType), @@ -955,15 +1029,15 @@ create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) -> end, case Check of ok -> - do_create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode); + do_create_or_update_bridge(ConfRootKey, BridgeType, BridgeName, Conf, HttpStatusCode); BadRequest -> BadRequest end. -do_create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) -> - case emqx_bridge_v2:create(BridgeType, BridgeName, Conf) of +do_create_or_update_bridge(ConfRootKey, BridgeType, BridgeName, Conf, HttpStatusCode) -> + case emqx_bridge_v2:create(ConfRootKey, BridgeType, BridgeName, Conf) of {ok, _} -> - lookup_from_all_nodes(BridgeType, BridgeName, HttpStatusCode); + lookup_from_all_nodes(ConfRootKey, BridgeType, BridgeName, HttpStatusCode); {error, {PreOrPostConfigUpdate, _HandlerMod, Reason}} when PreOrPostConfigUpdate =:= pre_config_update; PreOrPostConfigUpdate =:= post_config_update diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v6.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v6.erl new file mode 100644 index 000000000..d6fe68466 --- /dev/null +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v6.erl @@ -0,0 +1,196 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_proto_v6). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + list_bridges_on_nodes/1, + restart_bridge_to_node/3, + start_bridge_to_node/3, + stop_bridge_to_node/3, + lookup_from_all_nodes/3, + get_metrics_from_all_nodes/3, + restart_bridges_to_all_nodes/3, + start_bridges_to_all_nodes/3, + stop_bridges_to_all_nodes/3, + + %% introduced in v6 + v2_lookup_from_all_nodes_v6/4, + v2_list_bridges_on_nodes_v6/2, + v2_get_metrics_from_all_nodes_v6/4, + v2_start_bridge_to_node_v6/4, + v2_start_bridge_to_all_nodes_v6/4 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 15000). + +introduced_in() -> + "5.5.0". + +-spec list_bridges_on_nodes([node()]) -> + emqx_rpc:erpc_multicall([emqx_resource:resource_data()]). +list_bridges_on_nodes(Nodes) -> + erpc:multicall(Nodes, emqx_bridge, list, [], ?TIMEOUT). + +-type key() :: atom() | binary() | [byte()]. + +-spec restart_bridge_to_node(node(), key(), key()) -> + term(). +restart_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + restart, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec start_bridge_to_node(node(), key(), key()) -> + term(). +start_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec stop_bridge_to_node(node(), key(), key()) -> + term(). +stop_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + stop, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec restart_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(ok). +restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + restart, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec start_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(ok). +start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec stop_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(ok). +stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + stop, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec lookup_from_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(term()). +lookup_from_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_api, + lookup_from_local_node, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec get_metrics_from_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(emqx_metrics_worker:metrics()). +get_metrics_from_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_api, + get_metrics_from_local_node, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +%%-------------------------------------------------------------------------------- +%% introduced in v6 +%%-------------------------------------------------------------------------------- + +%% V2 Calls +-spec v2_lookup_from_all_nodes_v6([node()], emqx_bridge_v2:root_cfg_key(), key(), key()) -> + emqx_rpc:erpc_multicall(term()). +v2_lookup_from_all_nodes_v6(Nodes, ConfRootKey, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_v2_api, + lookup_from_local_node_v6, + [ConfRootKey, BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec v2_list_bridges_on_nodes_v6([node()], emqx_bridge_v2:root_cfg_key()) -> + emqx_rpc:erpc_multicall([emqx_resource:resource_data()]). +v2_list_bridges_on_nodes_v6(Nodes, ConfRootKey) -> + erpc:multicall(Nodes, emqx_bridge_v2, list, [ConfRootKey], ?TIMEOUT). + +-spec v2_get_metrics_from_all_nodes_v6([node()], emqx_bridge_v2:root_cfg_key(), key(), key()) -> + emqx_rpc:erpc_multicall(term()). +v2_get_metrics_from_all_nodes_v6(Nodes, ConfRootKey, ActionType, ActionName) -> + erpc:multicall( + Nodes, + emqx_bridge_v2_api, + get_metrics_from_local_node_v6, + [ConfRootKey, ActionType, ActionName], + ?TIMEOUT + ). + +-spec v2_start_bridge_to_all_nodes_v6([node()], emqx_bridge_v2:root_cfg_key(), key(), key()) -> + emqx_rpc:erpc_multicall(ok). +v2_start_bridge_to_all_nodes_v6(Nodes, ConfRootKey, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_v2, + start, + [ConfRootKey, BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec v2_start_bridge_to_node_v6(node(), emqx_bridge_v2:root_cfg_key(), key(), key()) -> + term(). +v2_start_bridge_to_node_v6(Node, ConfRootKey, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_v2, + start, + [ConfRootKey, BridgeType, BridgeName], + ?TIMEOUT + ). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 1314fef48..112e24e63 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -160,8 +160,9 @@ end_per_group(_, Config) -> init_per_testcase(t_broken_bpapi_vsn, Config) -> meck:new(emqx_bpapi, [passthrough]), - meck:expect(emqx_bpapi, supported_version, 1, -1), meck:expect(emqx_bpapi, supported_version, 2, -1), + meck:new(emqx_bridge_api, [passthrough]), + meck:expect(emqx_bridge_api, supported_versions, 1, []), init_per_testcase(common, Config); init_per_testcase(t_old_bpapi_vsn, Config) -> meck:new(emqx_bpapi, [passthrough]), @@ -173,10 +174,10 @@ init_per_testcase(_, Config) -> [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config]. end_per_testcase(t_broken_bpapi_vsn, Config) -> - meck:unload([emqx_bpapi]), + meck:unload(), end_per_testcase(common, Config); end_per_testcase(t_old_bpapi_vsn, Config) -> - meck:unload([emqx_bpapi]), + meck:unload(), end_per_testcase(common, Config); end_per_testcase(_, Config) -> Sock = ?config(sock, Config), diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index df4560e6e..7ab8c68a6 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -196,20 +196,10 @@ delete_bridge_http_api_v1(Opts) -> op_bridge_api(Op, BridgeType, BridgeName) -> BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - Opts = #{return_all => true}, ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]), - Res = - case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of - {ok, {Status = {_, 204, _}, Headers, Body}} -> - {ok, {Status, Headers, Body}}; - {ok, {Status, Headers, Body}} -> - {ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; - {error, {Status, Headers, Body}} -> - {error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; - Error -> - Error - end, + Method = post, + Params = [], + Res = emqx_bridge_v2_testlib:request(Method, Path, Params), ct:pal("bridge op result: ~p", [Res]), Res. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 5b821fea4..f7dd74161 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -308,21 +308,11 @@ update_bridge_api(Config, Overrides) -> op_bridge_api(Op, BridgeType, BridgeName) -> BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId, Op]), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - Opts = #{return_all => true}, ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]), - Res = - case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of - {ok, {Status = {_, 204, _}, Headers, Body}} -> - {ok, {Status, Headers, Body}}; - {ok, {Status, Headers, Body}} -> - {ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; - {error, {Status, Headers, Body}} -> - {error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; - Error -> - Error - end, - ct:pal("bridge op result: ~p", [Res]), + Method = post, + Params = [], + Res = request(Method, Path, Params), + ct:pal("bridge op result:\n ~p", [Res]), Res. probe_bridge_api(Config) ->