From 28de7c89c7c02bc6114fb62133e5ee11d2b8110d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 8 Jan 2024 17:24:55 -0300 Subject: [PATCH] feat: add `/sources*` HTTP APIs --- apps/emqx_bridge/src/emqx_bridge_resource.erl | 20 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 23 +- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 384 ++++++++++++++++-- .../src/proto/emqx_bridge_proto_v6.erl | 12 +- .../src/schema/emqx_bridge_v2_schema.erl | 239 ++++++++--- ...qx_bridge_v1_compatibility_layer_SUITE.erl | 2 +- .../test/emqx_bridge_v2_testlib.erl | 187 +++++---- .../emqx_bridge/test/emqx_bridge_v2_tests.erl | 2 +- .../src/emqx_bridge_mqtt_connector.erl | 4 +- .../src/emqx_bridge_mqtt_pubsub_schema.erl | 23 +- .../emqx_bridge_mqtt_v2_subscriber_SUITE.erl | 175 ++++++++ 11 files changed, 870 insertions(+), 201 deletions(-) create mode 100644 apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 0a870abb8..ec7a7431b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -23,6 +23,7 @@ bridge_to_resource_type/1, resource_id/1, resource_id/2, + resource_id/3, bridge_id/2, parse_bridge_id/1, parse_bridge_id/2, @@ -62,6 +63,9 @@ ?IS_BI_DIR_BRIDGE(TYPE) ). +-define(ROOT_KEY_ACTIONS, actions). +-define(ROOT_KEY_SOURCES, sources). + -if(?EMQX_RELEASE_EDITION == ee). bridge_to_resource_type(BridgeType) when is_binary(BridgeType) -> bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8)); @@ -85,11 +89,21 @@ bridge_impl_module(_BridgeType) -> undefined. -endif. resource_id(BridgeId) when is_binary(BridgeId) -> + resource_id_for_kind(?ROOT_KEY_ACTIONS, BridgeId). + +resource_id(BridgeType, BridgeName) -> + resource_id(?ROOT_KEY_ACTIONS, BridgeType, BridgeName). + +resource_id(ConfRootKey, BridgeType, BridgeName) -> + BridgeId = bridge_id(BridgeType, BridgeName), + resource_id_for_kind(ConfRootKey, BridgeId). + +resource_id_for_kind(ConfRootKey, BridgeId) when is_binary(BridgeId) -> case binary:split(BridgeId, <<":">>) of [Type, _Name] -> case emqx_bridge_v2:is_bridge_v2_type(Type) of true -> - emqx_bridge_v2:bridge_v1_id_to_connector_resource_id(BridgeId); + emqx_bridge_v2:bridge_v1_id_to_connector_resource_id(ConfRootKey, BridgeId); false -> <<"bridge:", BridgeId/binary>> end; @@ -97,10 +111,6 @@ resource_id(BridgeId) when is_binary(BridgeId) -> invalid_data(<<"should be of pattern {type}:{name}, but got ", BridgeId/binary>>) end. -resource_id(BridgeType, BridgeName) -> - BridgeId = bridge_id(BridgeType, BridgeName), - resource_id(BridgeId). - bridge_id(BridgeType, BridgeName) -> Name = bin(BridgeName), Type = bin(BridgeType), diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index fb8ae2e2e..67aeeca41 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -91,6 +91,7 @@ id/2, id/3, bridge_v1_is_valid/2, + bridge_v1_is_valid/3, extract_connector_id_from_bridge_v2_id/1 ]). @@ -128,6 +129,7 @@ %% Exception from the naming convention: bridge_v2_type_to_bridge_v1_type/2, bridge_v1_id_to_connector_resource_id/1, + bridge_v1_id_to_connector_resource_id/2, bridge_v1_enable_disable/3, bridge_v1_restart/2, bridge_v1_stop/2, @@ -567,7 +569,7 @@ connector_operation_helper(ConfRootKey, BridgeV2Type, Name, ConnectorOpFun, DoHe ConfRootKey, BridgeV2Type, Name, - lookup_conf(BridgeV2Type, Name), + lookup_conf(ConfRootKey, BridgeV2Type, Name), ConnectorOpFun, DoHealthCheck ). @@ -1191,8 +1193,11 @@ unpack_bridge_conf(Type, PackedConf, TopLevelConf) -> %% * The corresponding bridge v2 should exist %% * The connector for the bridge v2 should have exactly one channel bridge_v1_is_valid(BridgeV1Type, BridgeName) -> + bridge_v1_is_valid(?ROOT_KEY_ACTIONS, BridgeV1Type, BridgeName). + +bridge_v1_is_valid(ConfRootKey, BridgeV1Type, BridgeName) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), - case lookup_conf(BridgeV2Type, BridgeName) of + case lookup_conf(ConfRootKey, BridgeV2Type, BridgeName) of {error, _} -> %% If the bridge v2 does not exist, it is a valid bridge v1 true; @@ -1241,17 +1246,20 @@ bridge_v1_list_and_transform() -> bridge_v1_lookup_and_transform(ActionType, Name) -> case lookup_actions_or_sources(ActionType, Name) of - {ok, ConfRootName, + {ok, ConfRootKey, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} -> BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig), HasBridgeV1Equivalent = has_bridge_v1_equivalent(ActionType), - case HasBridgeV1Equivalent andalso ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of + case + HasBridgeV1Equivalent andalso + ?MODULE:bridge_v1_is_valid(ConfRootKey, BridgeV1Type, Name) + of true -> ConnectorType = connector_type(ActionType), case emqx_connector:lookup(ConnectorType, ConnectorName) of {ok, Connector} -> bridge_v1_lookup_and_transform_helper( - ConfRootName, + ConfRootKey, BridgeV1Type, Name, ActionType, @@ -1718,11 +1726,14 @@ connector_has_channels(BridgeV2Type, ConnectorName) -> end. bridge_v1_id_to_connector_resource_id(BridgeId) -> + bridge_v1_id_to_connector_resource_id(?ROOT_KEY_ACTIONS, BridgeId). + +bridge_v1_id_to_connector_resource_id(ConfRootKey, BridgeId) -> case binary:split(BridgeId, <<":">>) of [Type, Name] -> BridgeV2Type = bin(bridge_v1_type_to_bridge_v2_type(Type)), ConnectorName = - case lookup_conf(BridgeV2Type, Name) of + case lookup_conf(ConfRootKey, BridgeV2Type, Name) of #{connector := Con} -> Con; {error, Reason} -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 3f0d18fae..d4401cfd0 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -37,7 +37,7 @@ namespace/0 ]). -%% API callbacks +%% API callbacks : actions -export([ '/actions'/2, '/actions/:id'/2, @@ -49,6 +49,18 @@ '/actions_probe'/2, '/action_types'/2 ]). +%% API callbacks : sources +-export([ + '/sources'/2, + '/sources/:id'/2, + '/sources/:id/metrics'/2, + '/sources/:id/metrics/reset'/2, + '/sources/:id/enable/:enable'/2, + '/sources/:id/:operation'/2, + '/nodes/:node/sources/:id/:operation'/2, + '/sources_probe'/2, + '/source_types'/2 +]). %% BpAPI / RPC Targets -export([ @@ -81,13 +93,16 @@ end ). -namespace() -> "actions". +namespace() -> "actions_and_sources". api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). paths() -> [ + %%============= + %% Actions + %%============= "/actions", "/actions/:id", "/actions/:id/enable/:enable", @@ -98,7 +113,21 @@ paths() -> "/actions/:id/metrics", "/actions/:id/metrics/reset", "/actions_probe", - "/action_types" + "/action_types", + %%============= + %% Sources + %%============= + "/sources", + "/sources/:id", + "/sources/:id/enable/:enable", + "/sources/:id/:operation", + "/nodes/:node/sources/:id/:operation", + %% %% Caveat: metrics paths must come *after* `/:operation', otherwise minirest will + %% %% try to match the latter first, trying to interpret `metrics' as an operation... + "/sources/:id/metrics", + "/sources/:id/metrics/reset", + "/sources_probe" + %% "/source_types" ]. error_schema(Code, Message) -> @@ -111,17 +140,28 @@ error_schema(Codes, Message, ExtraFields) when is_list(Message) -> error_schema(Codes, Message, ExtraFields) when is_list(Codes) andalso is_binary(Message) -> ExtraFields ++ emqx_dashboard_swagger:error_codes(Codes, Message). -get_response_body_schema() -> +actions_get_response_body_schema() -> emqx_dashboard_swagger:schema_with_examples( - emqx_bridge_v2_schema:get_response(), - bridge_info_examples(get) + emqx_bridge_v2_schema:actions_get_response(), + bridge_info_examples(get, ?ROOT_KEY_ACTIONS) ). -bridge_info_examples(Method) -> - emqx_bridge_v2_schema:examples(Method). +sources_get_response_body_schema() -> + emqx_dashboard_swagger:schema_with_examples( + emqx_bridge_v2_schema:sources_get_response(), + bridge_info_examples(get, ?ROOT_KEY_SOURCES) + ). -bridge_info_array_example(Method) -> - lists:map(fun(#{value := Config}) -> Config end, maps:values(bridge_info_examples(Method))). +bridge_info_examples(Method, ?ROOT_KEY_ACTIONS) -> + emqx_bridge_v2_schema:actions_examples(Method); +bridge_info_examples(Method, ?ROOT_KEY_SOURCES) -> + emqx_bridge_v2_schema:sources_examples(Method). + +bridge_info_array_example(Method, ConfRootKey) -> + lists:map( + fun(#{value := Config}) -> Config end, + maps:values(bridge_info_examples(Method, ConfRootKey)) + ). param_path_id() -> {id, @@ -195,6 +235,9 @@ param_path_enable() -> } )}. +%%================================================================================ +%% Actions +%%================================================================================ schema("/actions") -> #{ 'operationId' => '/actions', @@ -204,8 +247,8 @@ schema("/actions") -> description => ?DESC("desc_api1"), responses => #{ 200 => emqx_dashboard_swagger:schema_with_example( - array(emqx_bridge_v2_schema:get_response()), - bridge_info_array_example(get) + array(emqx_bridge_v2_schema:actions_get_response()), + bridge_info_array_example(get, ?ROOT_KEY_ACTIONS) ) } }, @@ -214,11 +257,11 @@ schema("/actions") -> summary => <<"Create bridge">>, description => ?DESC("desc_api2"), 'requestBody' => emqx_dashboard_swagger:schema_with_examples( - emqx_bridge_v2_schema:post_request(), - bridge_info_examples(post) + emqx_bridge_v2_schema:actions_post_request(), + bridge_info_examples(post, ?ROOT_KEY_ACTIONS) ), responses => #{ - 201 => get_response_body_schema(), + 201 => actions_get_response_body_schema(), 400 => error_schema('ALREADY_EXISTS', "Bridge already exists") } } @@ -232,7 +275,7 @@ schema("/actions/:id") -> description => ?DESC("desc_api3"), parameters => [param_path_id()], responses => #{ - 200 => get_response_body_schema(), + 200 => actions_get_response_body_schema(), 404 => error_schema('NOT_FOUND', "Bridge not found") } }, @@ -242,11 +285,11 @@ schema("/actions/:id") -> description => ?DESC("desc_api4"), parameters => [param_path_id()], 'requestBody' => emqx_dashboard_swagger:schema_with_examples( - emqx_bridge_v2_schema:put_request(), - bridge_info_examples(put) + emqx_bridge_v2_schema:actions_put_request(), + bridge_info_examples(put, ?ROOT_KEY_ACTIONS) ), responses => #{ - 200 => get_response_body_schema(), + 200 => actions_get_response_body_schema(), 404 => error_schema('NOT_FOUND', "Bridge not found"), 400 => error_schema('BAD_REQUEST', "Update bridge failed") } @@ -371,8 +414,8 @@ schema("/actions_probe") -> desc => ?DESC("desc_api9"), summary => <<"Test creating bridge">>, 'requestBody' => emqx_dashboard_swagger:schema_with_examples( - emqx_bridge_v2_schema:post_request(), - bridge_info_examples(post) + emqx_bridge_v2_schema:actions_post_request(), + bridge_info_examples(post, ?ROOT_KEY_ACTIONS) ), responses => #{ 204 => <<"Test bridge OK">>, @@ -389,12 +432,223 @@ schema("/action_types") -> summary => <<"List available action types">>, responses => #{ 200 => emqx_dashboard_swagger:schema_with_examples( - array(emqx_bridge_v2_schema:types_sc()), + array(emqx_bridge_v2_schema:action_types_sc()), #{ <<"types">> => #{ summary => <<"Action types">>, - value => emqx_bridge_v2_schema:types() + value => emqx_bridge_v2_schema:action_types() + } + } + ) + } + } + }; +%%================================================================================ +%% Sources +%%================================================================================ +schema("/sources") -> + #{ + 'operationId' => '/sources', + get => #{ + tags => [<<"sources">>], + summary => <<"List sources">>, + description => ?DESC("desc_api1"), + responses => #{ + %% FIXME: examples + 200 => emqx_dashboard_swagger:schema_with_example( + array(emqx_bridge_v2_schema:sources_get_response()), + bridge_info_array_example(get, ?ROOT_KEY_SOURCES) + ) + } + }, + post => #{ + tags => [<<"sources">>], + summary => <<"Create source">>, + description => ?DESC("desc_api2"), + %% FIXME: examples + 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + emqx_bridge_v2_schema:sources_post_request(), + bridge_info_examples(post, ?ROOT_KEY_SOURCES) + ), + responses => #{ + 201 => sources_get_response_body_schema(), + 400 => error_schema('ALREADY_EXISTS', "Source already exists") + } + } + }; +schema("/sources/:id") -> + #{ + 'operationId' => '/sources/:id', + get => #{ + tags => [<<"sources">>], + summary => <<"Get source">>, + description => ?DESC("desc_api3"), + parameters => [param_path_id()], + responses => #{ + 200 => sources_get_response_body_schema(), + 404 => error_schema('NOT_FOUND', "Source not found") + } + }, + put => #{ + tags => [<<"sources">>], + summary => <<"Update source">>, + description => ?DESC("desc_api4"), + parameters => [param_path_id()], + 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + emqx_bridge_v2_schema:sources_put_request(), + bridge_info_examples(put, ?ROOT_KEY_SOURCES) + ), + responses => #{ + 200 => sources_get_response_body_schema(), + 404 => error_schema('NOT_FOUND', "Source not found"), + 400 => error_schema('BAD_REQUEST', "Update source failed") + } + }, + delete => #{ + tags => [<<"sources">>], + summary => <<"Delete source">>, + description => ?DESC("desc_api5"), + parameters => [param_path_id(), param_qs_delete_cascade()], + responses => #{ + 204 => <<"Source deleted">>, + 400 => error_schema( + 'BAD_REQUEST', + "Cannot delete bridge while active rules are defined for this source", + [{rules, mk(array(string()), #{desc => "Dependent Rule IDs"})}] + ), + 404 => error_schema('NOT_FOUND', "Source not found"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") + } + } + }; +schema("/sources/:id/metrics") -> + #{ + 'operationId' => '/sources/:id/metrics', + get => #{ + tags => [<<"sources">>], + summary => <<"Get source metrics">>, + description => ?DESC("desc_bridge_metrics"), + parameters => [param_path_id()], + responses => #{ + 200 => emqx_bridge_schema:metrics_fields(), + 404 => error_schema('NOT_FOUND', "Source not found") + } + } + }; +schema("/sources/:id/metrics/reset") -> + #{ + 'operationId' => '/sources/:id/metrics/reset', + put => #{ + tags => [<<"sources">>], + summary => <<"Reset source metrics">>, + description => ?DESC("desc_api6"), + parameters => [param_path_id()], + responses => #{ + 204 => <<"Reset success">>, + 404 => error_schema('NOT_FOUND', "Source not found") + } + } + }; +schema("/sources/:id/enable/:enable") -> + #{ + 'operationId' => '/sources/:id/enable/:enable', + put => + #{ + tags => [<<"sources">>], + summary => <<"Enable or disable bridge">>, + desc => ?DESC("desc_enable_bridge"), + parameters => [param_path_id(), param_path_enable()], + responses => + #{ + 204 => <<"Success">>, + 404 => error_schema( + 'NOT_FOUND', "Bridge not found or invalid operation" + ), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") + } + } + }; +schema("/sources/:id/:operation") -> + #{ + 'operationId' => '/sources/:id/:operation', + post => #{ + tags => [<<"sources">>], + summary => <<"Manually start a bridge">>, + description => ?DESC("desc_api7"), + parameters => [ + param_path_id(), + param_path_operation_cluster() + ], + responses => #{ + 204 => <<"Operation success">>, + 400 => error_schema( + 'BAD_REQUEST', "Problem with configuration of external service" + ), + 404 => error_schema('NOT_FOUND', "Bridge not found or invalid operation"), + 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") + } + } + }; +schema("/nodes/:node/sources/:id/:operation") -> + #{ + 'operationId' => '/nodes/:node/sources/:id/:operation', + post => #{ + tags => [<<"sources">>], + summary => <<"Manually start a bridge on a given node">>, + description => ?DESC("desc_api8"), + parameters => [ + param_path_node(), + param_path_id(), + param_path_operation_on_node() + ], + responses => #{ + 204 => <<"Operation success">>, + 400 => error_schema( + 'BAD_REQUEST', + "Problem with configuration of external service or bridge not enabled" + ), + 404 => error_schema( + 'NOT_FOUND', "Bridge or node not found or invalid operation" + ), + 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") + } + } + }; +schema("/sources_probe") -> + #{ + 'operationId' => '/sources_probe', + post => #{ + tags => [<<"sources">>], + desc => ?DESC("desc_api9"), + summary => <<"Test creating bridge">>, + 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + emqx_bridge_v2_schema:sources_post_request(), + bridge_info_examples(post, ?ROOT_KEY_SOURCES) + ), + responses => #{ + 204 => <<"Test bridge OK">>, + 400 => error_schema(['TEST_FAILED'], "bridge test failed") + } + } + }; +schema("/source_types") -> + #{ + 'operationId' => '/source_types', + get => #{ + tags => [<<"sources">>], + desc => ?DESC("desc_api10"), + summary => <<"List available source types">>, + responses => #{ + 200 => emqx_dashboard_swagger:schema_with_examples( + array(emqx_bridge_v2_schema:action_types_sc()), + #{ + <<"types">> => + #{ + summary => <<"Source types">>, + value => emqx_bridge_v2_schema:action_types() } } ) @@ -402,6 +656,12 @@ schema("/action_types") -> } }. +%%------------------------------------------------------------------------------ +%% Thin Handlers +%%------------------------------------------------------------------------------ +%%================================================================================ +%% Actions +%%================================================================================ '/actions'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) -> handle_create(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, Conf0); '/actions'(get, _Params) -> @@ -439,7 +699,48 @@ schema("/action_types") -> handle_probe(?ROOT_KEY_ACTIONS, Request). '/action_types'(get, _Request) -> - ?OK(emqx_bridge_v2_schema:types()). + ?OK(emqx_bridge_v2_schema:action_types()). +%%================================================================================ +%% Sources +%%================================================================================ +'/sources'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) -> + handle_create(?ROOT_KEY_SOURCES, BridgeType, BridgeName, Conf0); +'/sources'(get, _Params) -> + handle_list(?ROOT_KEY_SOURCES). + +'/sources/:id'(get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, lookup_from_all_nodes(?ROOT_KEY_SOURCES, BridgeType, BridgeName, 200)); +'/sources/:id'(put, #{bindings := #{id := Id}, body := Conf0}) -> + handle_update(?ROOT_KEY_SOURCES, Id, Conf0); +'/sources/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) -> + handle_delete(?ROOT_KEY_SOURCES, Id, Qs). + +'/sources/:id/metrics'(get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, get_metrics_from_all_nodes(?ROOT_KEY_SOURCES, BridgeType, BridgeName)). + +'/sources/:id/metrics/reset'(put, #{bindings := #{id := Id}}) -> + handle_reset_metrics(?ROOT_KEY_SOURCES, Id). + +'/sources/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> + handle_disable_enable(?ROOT_KEY_SOURCES, Id, Enable). + +'/sources/:id/:operation'(post, #{ + bindings := + #{id := Id, operation := Op} +}) -> + handle_operation(?ROOT_KEY_SOURCES, Id, Op). + +'/nodes/:node/sources/:id/:operation'(post, #{ + bindings := + #{id := Id, operation := Op, node := Node} +}) -> + handle_node_operation(?ROOT_KEY_SOURCES, Node, Id, Op). + +'/sources_probe'(post, Request) -> + handle_probe(?ROOT_KEY_SOURCES, Request). + +'/source_types'(get, _Request) -> + ?OK(emqx_bridge_v2_schema:source_types()). %%------------------------------------------------------------------------------ %% Handlers @@ -451,7 +752,7 @@ handle_list(ConfRootKey) -> case is_ok(NodeReplies) of {ok, NodeBridges} -> AllBridges = [ - [format_resource(Data, Node) || Data <- Bridges] + [format_resource(ConfRootKey, Data, Node) || Data <- Bridges] || {Node, Bridges} <- lists:zip(Nodes, NodeBridges) ], ?OK(zip_bridges(AllBridges)); @@ -574,7 +875,12 @@ handle_node_operation(ConfRootKey, Node, Id, Op) -> ). handle_probe(ConfRootKey, Request) -> - RequestMeta = #{module => ?MODULE, method => post, path => "/actions_probe"}, + Path = + case ConfRootKey of + ?ROOT_KEY_ACTIONS -> "/actions_probe"; + ?ROOT_KEY_SOURCES -> "/sources_probe" + end, + RequestMeta = #{module => ?MODULE, method => post, path => Path}, case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of {ok, #{body := #{<<"type">> := Type} = Params}} -> Params1 = maybe_deobfuscate_bridge_probe(Params), @@ -664,8 +970,8 @@ get_metrics_from_all_nodes(ConfRootKey, Type, Name) -> ?INTERNAL_ERROR(Reason) end. -operation_func(all, start) -> v2_start_bridge_to_all_nodes_v6; -operation_func(_Node, start) -> v2_start_bridge_to_node_v6; +operation_func(all, start) -> v2_start_bridge_on_all_nodes_v6; +operation_func(_Node, start) -> v2_start_bridge_on_node_v6; operation_func(all, lookup) -> v2_lookup_from_all_nodes_v6; operation_func(all, list) -> v2_list_bridges_on_nodes_v6; operation_func(all, get_metrics) -> v2_get_metrics_from_all_nodes_v6. @@ -825,7 +1131,7 @@ aggregate_status(AllStatus) -> %% RPC Target lookup_from_local_node(BridgeType, BridgeName) -> case emqx_bridge_v2:lookup(BridgeType, BridgeName) of - {ok, Res} -> {ok, format_resource(Res, node())}; + {ok, Res} -> {ok, format_resource(?ROOT_KEY_ACTIONS, Res, node())}; Error -> Error end. @@ -833,7 +1139,7 @@ lookup_from_local_node(BridgeType, BridgeName) -> -spec lookup_from_local_node_v6(emqx_bridge_v2:root_cfg_key(), _, _) -> _. lookup_from_local_node_v6(ConfRootKey, BridgeType, BridgeName) -> case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of - {ok, Res} -> {ok, format_resource(Res, node())}; + {ok, Res} -> {ok, format_resource(ConfRootKey, Res, node())}; Error -> Error end. @@ -847,6 +1153,7 @@ get_metrics_from_local_node_v6(ConfRootKey, Type, Name) -> %% resource format_resource( + ConfRootKey, #{ type := Type, name := Name, @@ -857,7 +1164,7 @@ format_resource( }, Node ) -> - RawConf = fill_defaults(Type, RawConf0), + RawConf = fill_defaults(ConfRootKey, Type, RawConf0), redact( maps:merge( RawConf#{ @@ -988,17 +1295,18 @@ aggregate_metrics( M17 + N17 ). -fill_defaults(Type, RawConf) -> - PackedConf = pack_bridge_conf(Type, RawConf), +fill_defaults(ConfRootKey, Type, RawConf) -> + PackedConf = pack_bridge_conf(ConfRootKey, Type, RawConf), FullConf = emqx_config:fill_defaults(emqx_bridge_v2_schema, PackedConf, #{}), - unpack_bridge_conf(Type, FullConf). + unpack_bridge_conf(ConfRootKey, Type, FullConf). -pack_bridge_conf(Type, RawConf) -> - #{<<"actions">> => #{bin(Type) => #{<<"foo">> => RawConf}}}. +pack_bridge_conf(ConfRootKey, Type, RawConf) -> + #{bin(ConfRootKey) => #{bin(Type) => #{<<"foo">> => RawConf}}}. -unpack_bridge_conf(Type, PackedConf) -> +unpack_bridge_conf(ConfRootKey, Type, PackedConf) -> + ConfRootKeyBin = bin(ConfRootKey), TypeBin = bin(Type), - #{<<"actions">> := Bridges} = PackedConf, + #{ConfRootKeyBin := Bridges} = PackedConf, #{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges), RawConf. diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v6.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v6.erl index d6fe68466..fbcef8b5c 100644 --- a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v6.erl +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v6.erl @@ -35,8 +35,8 @@ v2_lookup_from_all_nodes_v6/4, v2_list_bridges_on_nodes_v6/2, v2_get_metrics_from_all_nodes_v6/4, - v2_start_bridge_to_node_v6/4, - v2_start_bridge_to_all_nodes_v6/4 + v2_start_bridge_on_node_v6/4, + v2_start_bridge_on_all_nodes_v6/4 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -173,9 +173,9 @@ v2_get_metrics_from_all_nodes_v6(Nodes, ConfRootKey, ActionType, ActionName) -> ?TIMEOUT ). --spec v2_start_bridge_to_all_nodes_v6([node()], emqx_bridge_v2:root_cfg_key(), key(), key()) -> +-spec v2_start_bridge_on_all_nodes_v6([node()], emqx_bridge_v2:root_cfg_key(), key(), key()) -> emqx_rpc:erpc_multicall(ok). -v2_start_bridge_to_all_nodes_v6(Nodes, ConfRootKey, BridgeType, BridgeName) -> +v2_start_bridge_on_all_nodes_v6(Nodes, ConfRootKey, BridgeType, BridgeName) -> erpc:multicall( Nodes, emqx_bridge_v2, @@ -184,9 +184,9 @@ v2_start_bridge_to_all_nodes_v6(Nodes, ConfRootKey, BridgeType, BridgeName) -> ?TIMEOUT ). --spec v2_start_bridge_to_node_v6(node(), emqx_bridge_v2:root_cfg_key(), key(), key()) -> +-spec v2_start_bridge_on_node_v6(node(), emqx_bridge_v2:root_cfg_key(), key(), key()) -> term(). -v2_start_bridge_to_node_v6(Node, ConfRootKey, BridgeType, BridgeName) -> +v2_start_bridge_on_node_v6(Node, ConfRootKey, BridgeType, BridgeName) -> rpc:call( Node, emqx_bridge_v2, diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 28017f814..ec9314fd2 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -28,21 +28,31 @@ -export([roots/0, fields/1, desc/1, namespace/0, tags/0]). -export([ - get_response/0, - put_request/0, - post_request/0, - examples/1, + actions_get_response/0, + actions_put_request/0, + actions_post_request/0, + actions_examples/1, action_values/4 ]). +-export([ + sources_get_response/0, + sources_put_request/0, + sources_post_request/0, + sources_examples/1, + source_values/4 +]). + %% Exported for mocking %% TODO: refactor emqx_bridge_v1_compatibility_layer_SUITE so we don't need to %% export this -export([ - registered_api_schemas/1 + registered_actions_api_schemas/1, + registered_sources_api_schemas/1 ]). --export([types/0, types_sc/0]). +-export([action_types/0, action_types_sc/0]). +-export([source_types/0, source_types_sc/0]). -export([resource_opts_fields/0, resource_opts_fields/1]). -export([ @@ -58,33 +68,140 @@ -export([actions_convert_from_connectors/1]). --export_type([action_type/0]). +-export_type([action_type/0, source_type/0]). %% Should we explicitly list them here so dialyzer may be more helpful? -type action_type() :: atom(). +-type source_type() :: atom(). +-type http_method() :: get | post | put. +-type schema_example_map() :: #{atom() => term()}. %%====================================================================================== %% For HTTP APIs -get_response() -> - api_schema("get"). +%%====================================================================================== -put_request() -> - api_schema("put"). +%%--------------------------------------------- +%% Actions +%%--------------------------------------------- -post_request() -> - api_schema("post"). +actions_get_response() -> + actions_api_schema("get"). -api_schema(Method) -> - APISchemas = ?MODULE:registered_api_schemas(Method), +actions_put_request() -> + actions_api_schema("put"). + +actions_post_request() -> + actions_api_schema("post"). + +actions_api_schema(Method) -> + APISchemas = ?MODULE:registered_actions_api_schemas(Method), hoconsc:union(bridge_api_union(APISchemas)). -registered_api_schemas(Method) -> +registered_actions_api_schemas(Method) -> RegisteredSchemas = emqx_action_info:registered_schema_modules_actions(), [ api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2") || {BridgeV2Type, SchemaModule} <- RegisteredSchemas ]. +-spec action_values(http_method(), atom(), atom(), schema_example_map()) -> schema_example_map(). +action_values(Method, ActionType, ConnectorType, ActionValues) -> + ActionTypeBin = atom_to_binary(ActionType), + ConnectorTypeBin = atom_to_binary(ConnectorType), + lists:foldl( + fun(M1, M2) -> + maps:merge(M1, M2) + end, + #{ + enable => true, + description => <<"My example ", ActionTypeBin/binary, " action">>, + connector => <>, + resource_opts => #{ + health_check_interval => "30s" + } + }, + [ + ActionValues, + method_values(action, Method, ActionType) + ] + ). + +actions_examples(Method) -> + MergeFun = + fun(Example, Examples) -> + maps:merge(Examples, Example) + end, + Fun = + fun(Module, Examples) -> + ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]), + lists:foldl(MergeFun, Examples, ConnectorExamples) + end, + SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules_actions()], + lists:foldl(Fun, #{}, SchemaModules). + +%%--------------------------------------------- +%% Sources +%%--------------------------------------------- + +sources_get_response() -> + sources_api_schema("get"). + +sources_put_request() -> + sources_api_schema("put"). + +sources_post_request() -> + sources_api_schema("post"). + +sources_api_schema(Method) -> + APISchemas = ?MODULE:registered_sources_api_schemas(Method), + hoconsc:union(bridge_api_union(APISchemas)). + +registered_sources_api_schemas(Method) -> + RegisteredSchemas = emqx_action_info:registered_schema_modules_sources(), + [ + api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_source") + || {BridgeV2Type, SchemaModule} <- RegisteredSchemas + ]. + +-spec source_values(http_method(), atom(), atom(), schema_example_map()) -> schema_example_map(). +source_values(Method, SourceType, ConnectorType, SourceValues) -> + SourceTypeBin = atom_to_binary(SourceType), + ConnectorTypeBin = atom_to_binary(ConnectorType), + lists:foldl( + fun(M1, M2) -> + maps:merge(M1, M2) + end, + #{ + enable => true, + description => <<"My example ", SourceTypeBin/binary, " source">>, + connector => <>, + resource_opts => #{ + health_check_interval => "30s" + } + }, + [ + SourceValues, + method_values(source, Method, SourceType) + ] + ). + +sources_examples(Method) -> + MergeFun = + fun(Example, Examples) -> + maps:merge(Examples, Example) + end, + Fun = + fun(Module, Examples) -> + ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]), + lists:foldl(MergeFun, Examples, ConnectorExamples) + end, + SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules_sources()], + lists:foldl(Fun, #{}, SchemaModules). + +%%--------------------------------------------- +%% Common helpers +%%--------------------------------------------- + api_ref(Module, Type, Method) -> {Type, ref(Module, Method)}. @@ -111,41 +228,17 @@ bridge_api_union(Refs) -> end end. --type http_method() :: get | post | put. --type schema_example_map() :: #{atom() => term()}. - --spec action_values(http_method(), atom(), atom(), schema_example_map()) -> schema_example_map(). -action_values(Method, ActionType, ConnectorType, ActionValues) -> - ActionTypeBin = atom_to_binary(ActionType), - ConnectorTypeBin = atom_to_binary(ConnectorType), - lists:foldl( - fun(M1, M2) -> - maps:merge(M1, M2) - end, - #{ - enable => true, - description => <<"My example ", ActionTypeBin/binary, " action">>, - connector => <>, - resource_opts => #{ - health_check_interval => "30s" - } - }, - [ - ActionValues, - method_values(Method, ActionType) - ] - ). - --spec method_values(http_method(), atom()) -> schema_example_map(). -method_values(post, Type) -> +-spec method_values(action | source, http_method(), atom()) -> schema_example_map(). +method_values(Kind, post, Type) -> + KindBin = atom_to_binary(Kind), TypeBin = atom_to_binary(Type), #{ - name => <>, + name => <>, type => TypeBin }; -method_values(get, Type) -> +method_values(Kind, get, Type) -> maps:merge( - method_values(post, Type), + method_values(Kind, post, Type), #{ status => <<"connected">>, node_status => [ @@ -156,7 +249,7 @@ method_values(get, Type) -> ] } ); -method_values(put, _Type) -> +method_values(_Kind, put, _Type) -> #{}. api_fields("get_bridge_v2", Type, Fields) -> @@ -175,16 +268,33 @@ api_fields("post_bridge_v2", Type, Fields) -> ] ); api_fields("put_bridge_v2", _Type, Fields) -> + Fields; +api_fields("get_source", Type, Fields) -> + lists:append( + [ + emqx_bridge_schema:type_and_name_fields(Type), + emqx_bridge_schema:status_fields(), + Fields + ] + ); +api_fields("post_source", Type, Fields) -> + lists:append( + [ + emqx_bridge_schema:type_and_name_fields(Type), + Fields + ] + ); +api_fields("put_source", _Type, Fields) -> Fields. %%====================================================================================== %% HOCON Schema Callbacks %%====================================================================================== -namespace() -> "actions". +namespace() -> "actions_and_sources". tags() -> - [<<"Actions">>]. + [<<"Actions">>, <<"Sources">>]. -dialyzer({nowarn_function, roots/0}). @@ -231,13 +341,21 @@ desc(resource_opts) -> desc(_) -> undefined. --spec types() -> [action_type()]. -types() -> +-spec action_types() -> [action_type()]. +action_types() -> proplists:get_keys(?MODULE:fields(actions)). --spec types_sc() -> ?ENUM([action_type()]). -types_sc() -> - hoconsc:enum(types()). +-spec action_types_sc() -> ?ENUM([action_type()]). +action_types_sc() -> + hoconsc:enum(action_types()). + +-spec source_types() -> [source_type()]. +source_types() -> + proplists:get_keys(?MODULE:fields(sources)). + +-spec source_types_sc() -> ?ENUM([source_type()]). +source_types_sc() -> + hoconsc:enum(source_types()). resource_opts_fields() -> resource_opts_fields(_Overrides = []). @@ -268,19 +386,6 @@ resource_opts_fields(Overrides) -> emqx_resource_schema:create_opts(Overrides) ). -examples(Method) -> - MergeFun = - fun(Example, Examples) -> - maps:merge(Examples, Example) - end, - Fun = - fun(Module, Examples) -> - ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]), - lists:foldl(MergeFun, Examples, ConnectorExamples) - end, - SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules_actions()], - lists:foldl(Fun, #{}, SchemaModules). - top_level_common_action_keys() -> [ <<"connector">>, diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl index dadc0a09c..b67791cb3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -104,7 +104,7 @@ setup_mocks() -> catch meck:new(emqx_bridge_v2_schema, MeckOpts), meck:expect( emqx_bridge_v2_schema, - registered_api_schemas, + registered_actions_api_schemas, 1, fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_v2_" ++ Method)}] diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index f7dd74161..88788d6e2 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -12,6 +12,9 @@ -import(emqx_common_test_helpers, [on_exit/1]). +-define(ROOT_KEY_ACTIONS, actions). +-define(ROOT_KEY_SOURCES, sources). + %% ct setup helpers init_per_suite(Config, Apps) -> @@ -152,6 +155,49 @@ create_bridge(Config, Overrides) -> ct:pal("creating bridge with config: ~p", [BridgeConfig]), emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig). +get_ct_config_with_fallback(Config, [Key]) -> + ?config(Key, Config); +get_ct_config_with_fallback(Config, [Key | Rest]) -> + case ?config(Key, Config) of + undefined -> + get_ct_config_with_fallback(Config, Rest); + X -> + X + end. + +get_config_by_kind(Config, Overrides) -> + Kind = ?config(bridge_kind, Config), + get_config_by_kind(Kind, Config, Overrides). + +get_config_by_kind(Kind, Config, Overrides) -> + case Kind of + action -> + %% TODO: refactor tests to use action_type... + ActionType = get_ct_config_with_fallback(Config, [action_type, bridge_type]), + ActionName = get_ct_config_with_fallback(Config, [action_name, bridge_name]), + ActionConfig0 = get_ct_config_with_fallback(Config, [action_config, bridge_config]), + ActionConfig = emqx_utils_maps:deep_merge(ActionConfig0, Overrides), + #{type => ActionType, name => ActionName, config => ActionConfig}; + source -> + SourceType = ?config(source_type, Config), + SourceName = ?config(source_name, Config), + SourceConfig0 = ?config(source_config, Config), + SourceConfig = emqx_utils_maps:deep_merge(SourceConfig0, Overrides), + #{type => SourceType, name => SourceName, config => SourceConfig} + end. + +api_path_root(Kind) -> + case Kind of + action -> "actions"; + source -> "sources" + end. + +conf_root_key(Kind) -> + case Kind of + action -> ?ROOT_KEY_ACTIONS; + source -> ?ROOT_KEY_SOURCES + end. + maybe_json_decode(X) -> case emqx_utils_json:safe_decode(X, [return_maps]) of {ok, Decoded} -> Decoded; @@ -218,26 +264,26 @@ create_bridge_api(Config) -> create_bridge_api(Config, _Overrides = #{}). create_bridge_api(Config, Overrides) -> - BridgeType = ?config(bridge_type, Config), - BridgeName = ?config(bridge_name, Config), - BridgeConfig0 = ?config(bridge_config, Config), - BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), - {ok, {{_, 201, _}, _, _}} = create_connector_api(Config), + create_kind_api(Config, Overrides). - Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName}, - Path = emqx_mgmt_api_test_util:api_path(["actions"]), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - Opts = #{return_all => true}, - ct:pal("creating bridge (via http): ~p", [Params]), - Res = - case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of - {ok, {Status, Headers, Body0}} -> - {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; - Error -> - Error - end, - ct:pal("bridge create result: ~p", [Res]), +create_kind_api(Config) -> + create_kind_api(Config, _Overrides = #{}). + +create_kind_api(Config, Overrides) -> + Kind = proplists:get_value(bridge_kind, Config, action), + #{ + type := Type, + name := Name, + config := BridgeConfig + } = get_config_by_kind(Kind, Config, Overrides), + Params = BridgeConfig#{<<"type">> => Type, <<"name">> => Name}, + PathRoot = api_path_root(Kind), + Path = emqx_mgmt_api_test_util:api_path([PathRoot]), + ct:pal("creating bridge (~s, http):\n ~p", [Kind, Params]), + Method = post, + Res = request(Method, Path, Params), + ct:pal("bridge create (~s, http) result:\n ~p", [Kind, Res]), Res. create_connector_api(Config) -> @@ -288,27 +334,29 @@ update_bridge_api(Config) -> update_bridge_api(Config, _Overrides = #{}). update_bridge_api(Config, Overrides) -> - BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), - BridgeConfig0 = ?config(bridge_config, Config), - BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), - BridgeId = emqx_bridge_resource:bridge_id(BridgeType, Name), - Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId]), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - Opts = #{return_all => true}, - ct:pal("updating bridge (via http): ~p", [BridgeConfig]), - Res = - case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, BridgeConfig, Opts) of - {ok, {_Status, _Headers, Body0}} -> {ok, emqx_utils_json:decode(Body0, [return_maps])}; - Error -> Error - end, - ct:pal("bridge update result: ~p", [Res]), + Kind = proplists:get_value(bridge_kind, Config, action), + #{ + type := Type, + name := Name, + config := Params + } = get_config_by_kind(Kind, Config, Overrides), + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + PathRoot = api_path_root(Kind), + Path = emqx_mgmt_api_test_util:api_path([PathRoot, BridgeId]), + ct:pal("updating bridge (~s, http):\n ~p", [Kind, Params]), + Method = put, + Res = request(Method, Path, Params), + ct:pal("update bridge (~s, http) result:\n ~p", [Kind, Res]), Res. op_bridge_api(Op, BridgeType, BridgeName) -> + op_bridge_api(_Kind = action, Op, BridgeType, BridgeName). + +op_bridge_api(Kind, Op, BridgeType, BridgeName) -> BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), - Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId, Op]), - ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]), + PathRoot = api_path_root(Kind), + Path = emqx_mgmt_api_test_util:api_path([PathRoot, BridgeId, Op]), + ct:pal("calling bridge ~p (~s, http):\n ~p", [BridgeId, Kind, Op]), Method = post, Params = [], Res = request(Method, Path, Params), @@ -326,17 +374,16 @@ probe_bridge_api(Config, Overrides) -> probe_bridge_api(BridgeType, BridgeName, BridgeConfig). probe_bridge_api(BridgeType, BridgeName, BridgeConfig) -> + probe_bridge_api(action, BridgeType, BridgeName, BridgeConfig). + +probe_bridge_api(Kind, BridgeType, BridgeName, BridgeConfig) -> Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName}, - Path = emqx_mgmt_api_test_util:api_path(["actions_probe"]), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - Opts = #{return_all => true}, - ct:pal("probing bridge (via http): ~p", [Params]), - Res = - case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of - {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0}; - Error -> Error - end, - ct:pal("bridge probe result: ~p", [Res]), + PathRoot = api_path_root(Kind), + Path = emqx_mgmt_api_test_util:api_path([PathRoot ++ "_probe"]), + ct:pal("probing bridge (~s, http):\n ~p", [Kind, Params]), + Method = post, + Res = request(Method, Path, Params), + ct:pal("bridge probe (~s, http) result:\n ~p", [Kind, Res]), Res. list_bridges_http_api_v1() -> @@ -353,6 +400,13 @@ list_actions_http_api() -> ct:pal("list actions (http v2) result:\n ~p", [Res]), Res. +list_sources_http_api() -> + Path = emqx_mgmt_api_test_util:api_path(["sources"]), + ct:pal("list sources (http v2)"), + Res = request(get, Path, _Params = []), + ct:pal("list sources (http v2) result:\n ~p", [Res]), + Res. + list_connectors_http_api() -> Path = emqx_mgmt_api_test_util:api_path(["connectors"]), ct:pal("list connectors"), @@ -506,13 +560,6 @@ t_create_via_http(Config) -> begin ?assertMatch({ok, _}, create_bridge_api(Config)), - %% lightweight matrix testing some configs - ?assertMatch( - {ok, _}, - update_bridge_api( - Config - ) - ), ?assertMatch( {ok, _}, update_bridge_api( @@ -526,23 +573,26 @@ t_create_via_http(Config) -> ok. t_start_stop(Config, StopTracePoint) -> - BridgeType = ?config(bridge_type, Config), - BridgeName = ?config(bridge_name, Config), - BridgeConfig = ?config(bridge_config, Config), + Kind = proplists:get_value(bridge_kind, Config, action), ConnectorName = ?config(connector_name, Config), ConnectorType = ?config(connector_type, Config), - ConnectorConfig = ?config(connector_config, Config), + #{ + type := Type, + name := Name, + config := BridgeConfig + } = get_config_by_kind(Kind, Config, _Overrides = #{}), ?assertMatch( - {ok, _}, - emqx_connector:create(ConnectorType, ConnectorName, ConnectorConfig) + {ok, {{_, 201, _}, _, _}}, + create_connector_api(Config) ), ?check_trace( begin ProbeRes0 = probe_bridge_api( - BridgeType, - BridgeName, + Kind, + Type, + Name, BridgeConfig ), ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), @@ -550,8 +600,9 @@ t_start_stop(Config, StopTracePoint) -> AtomsBefore = erlang:system_info(atom_count), %% Probe again; shouldn't have created more atoms. ProbeRes1 = probe_bridge_api( - BridgeType, - BridgeName, + Kind, + Type, + Name, BridgeConfig ), @@ -559,9 +610,9 @@ t_start_stop(Config, StopTracePoint) -> AtomsAfter = erlang:system_info(atom_count), ?assertEqual(AtomsBefore, AtomsAfter), - ?assertMatch({ok, _}, emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig)), + ?assertMatch({ok, _}, create_kind_api(Config)), - ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), + ResourceId = emqx_bridge_resource:resource_id(conf_root_key(Kind), Type, Name), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. @@ -574,7 +625,7 @@ t_start_stop(Config, StopTracePoint) -> %% `start` bridge to trigger `already_started` ?assertMatch( {ok, {{_, 204, _}, _Headers, []}}, - emqx_bridge_v2_testlib:op_bridge_api("start", BridgeType, BridgeName) + op_bridge_api(Kind, "start", Type, Name) ), ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), @@ -624,10 +675,10 @@ t_start_stop(Config, StopTracePoint) -> ) ), - ok + #{resource_id => ResourceId} end, - fun(Trace) -> - ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), + fun(Res, Trace) -> + #{resource_id := ResourceId} = Res, %% one for each probe, one for real ?assertMatch( [_, _, #{instance_id := ResourceId}], diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl b/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl index e90100995..c64b1f2cb 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_tests.erl @@ -108,7 +108,7 @@ connector_resource_opts_test() -> ok. actions_api_spec_post_fields_test() -> - ?UNION(Union) = emqx_bridge_v2_schema:post_request(), + ?UNION(Union) = emqx_bridge_v2_schema:actions_post_request(), Schemas = lists:map( fun(?R_REF(SchemaMod, StructName)) -> diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index dbdf68ef1..18af6ee11 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -237,7 +237,9 @@ on_stop(ResourceId, State) -> ets:delete(TopicToHandlerIndex) end, Allocated = emqx_resource:get_allocated_resources(ResourceId), - ok = stop_helper(Allocated). + ok = stop_helper(Allocated), + ?tp(mqtt_connector_stopped, #{instance_id => ResourceId}), + ok. stop_helper(#{pool_name := PoolName}) -> emqx_resource_pool:stop(PoolName). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl index 6d075334a..f765581f9 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl @@ -27,6 +27,9 @@ conn_bridge_examples/1 ]). +-define(ACTION_TYPE, mqtt). +-define(SOURCE_TYPE, mqtt). + %%====================================================================================== %% Hocon Schema Definitions namespace() -> "bridge_mqtt_publisher". @@ -86,14 +89,18 @@ fields(action_resource_opts) -> fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, emqx_bridge_v2_schema:resource_opts_fields() ); -fields("get_connector") -> - emqx_bridge_mqtt_connector_schema:fields("config_connector"); -fields("get_bridge_v2") -> - fields("mqtt_publisher_action"); -fields("post_bridge_v2") -> - fields("mqtt_publisher_action") ++ emqx_bridge_schema:type_and_name_fields(mqtt); -fields("put_bridge_v2") -> - fields("mqtt_publisher_action"); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields("mqtt_publisher_action")); +fields(Field) when + Field == "get_source"; + Field == "post_source"; + Field == "put_source" +-> + emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields("mqtt_subscriber_source")); fields(What) -> error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}). %% v2: api schema diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl new file mode 100644 index 000000000..fde15a1b6 --- /dev/null +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl @@ -0,0 +1,175 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_mqtt_v2_subscriber_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("emqx/include/asserts.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge_mqtt, + emqx_bridge, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, Api} = emqx_common_test_http:create_default_app(), + [ + {apps, Apps}, + {api, Api} + | Config + ]. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), + ok. + +init_per_testcase(TestCase, Config) -> + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]), + ConnectorConfig = connector_config(), + SourceConfig = source_config(#{connector => Name}), + [ + {bridge_kind, source}, + {source_type, mqtt}, + {source_name, Name}, + {source_config, SourceConfig}, + {connector_type, mqtt}, + {connector_name, Name}, + {connector_config, ConnectorConfig} + | Config + ]. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +connector_config() -> + %% !!!!!!!!!!!! FIXME!!!!!! add more fields ("server_configs") + #{ + <<"enable">> => true, + <<"description">> => <<"my connector">>, + <<"pool_size">> => 3, + <<"server">> => <<"127.0.0.1:1883">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> + } + }. + +source_config(Overrides0) -> + Overrides = emqx_utils_maps:binary_key_map(Overrides0), + CommonConfig = + #{ + <<"enable">> => true, + <<"connector">> => <<"please override">>, + <<"parameters">> => + #{ + <<"remote">> => + #{ + <<"topic">> => <<"remote/topic">>, + <<"qos">> => 2 + }, + <<"local">> => + #{ + <<"topic">> => <<"local/topic">>, + <<"qos">> => 2, + <<"retain">> => false, + <<"payload">> => <<"${payload}">> + } + }, + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"batch_time">> => <<"0ms">>, + <<"buffer_mode">> => <<"memory_only">>, + <<"buffer_seg_bytes">> => <<"10MB">>, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"metrics_flush_interval">> => <<"1s">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"45s">>, + <<"resume_interval">> => <<"15s">>, + <<"worker_pool_size">> => <<"1">> + } + }, + maps:merge(CommonConfig, Overrides). + +replace(Key, Value, Proplist) -> + lists:keyreplace(Key, 1, Proplist, {Key, Value}). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_create_via_http(Config) -> + ConnectorName = ?config(connector_name, Config), + ok = emqx_bridge_v2_testlib:t_create_via_http(Config), + ?assertMatch( + {ok, + {{_, 200, _}, _, [ + #{ + <<"enable">> := true, + <<"status">> := <<"connected">> + } + ]}}, + emqx_bridge_v2_testlib:list_bridges_http_api_v1() + ), + NewSourceName = <<"my_other_source">>, + {ok, {{_, 201, _}, _, _}} = + emqx_bridge_v2_testlib:create_kind_api( + replace(source_name, NewSourceName, Config) + ), + ?assertMatch( + {ok, + {{_, 200, _}, _, [ + #{<<"connector">> := ConnectorName}, + #{<<"connector">> := ConnectorName} + ]}}, + emqx_bridge_v2_testlib:list_sources_http_api() + ), + ?assertMatch( + {ok, {{_, 200, _}, _, []}}, + emqx_bridge_v2_testlib:list_bridges_http_api_v1() + ), + ok. + +t_start_stop(Config) -> + ok = emqx_bridge_v2_testlib:t_start_stop(Config, mqtt_connector_stopped), + ok.