From 048f4724a9f5d853840bc49139e5c1d99bfbec25 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Mon, 27 Nov 2023 16:09:17 +0100 Subject: [PATCH] feat(emqx_connector): add field 'actions' in API response Also unify schemas, use emqx_connector_schema for the generic parts. --- .../src/schema/emqx_bridge_v2_schema.erl | 2 +- .../src/emqx_bridge_azure_event_hub.erl | 51 ++++++----- .../src/emqx_bridge_confluent_producer.erl | 51 ++++++----- ...emqx_bridge_gcp_pubsub_producer_schema.erl | 26 +++--- .../src/emqx_bridge_kafka.erl | 29 ++++++- .../src/emqx_bridge_matrix.erl | 14 +-- .../src/emqx_bridge_mongodb.erl | 41 ++++----- .../src/emqx_bridge_syskeeper_connector.erl | 28 +++--- .../src/emqx_bridge_syskeeper_proxy.erl | 31 +++---- .../src/emqx_bridge_timescale.erl | 16 ++-- .../emqx_connector/src/emqx_connector_api.erl | 12 ++- .../src/schema/emqx_connector_schema.erl | 85 ++++++++++++++++++- .../test/emqx_connector_api_SUITE.erl | 82 +++++++++++++++--- .../emqx_postgresql_connector_schema.erl | 31 ++++--- rel/i18n/emqx_connector_schema.hocon | 45 ++++++++++ 15 files changed, 401 insertions(+), 143 deletions(-) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 25619d99a..188a550fc 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -250,7 +250,7 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> false; _ -> {true, #{ - schema_modle => Module, + schema_module => Module, type_name => TypeName, missing_fields => MissingFileds }} diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index 569725a34..cf733ddfd 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -31,8 +31,8 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). --define(AEH_CONNECTOR_TYPE, azure_event_hub_producer). --define(AEH_CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>). +-define(CONNECTOR_TYPE, azure_event_hub_producer). +-define(CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>). %%------------------------------------------------------------------------------------------------- %% `hocon_schema' API @@ -42,18 +42,17 @@ namespace() -> "bridge_azure_event_hub". roots() -> ["config_producer"]. -fields("put_connector") -> +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> Fields = override( - emqx_bridge_kafka:fields("put_connector"), - connector_overrides() - ), - override_documentations(Fields); -fields("get_connector") -> - emqx_bridge_schema:status_fields() ++ - fields("post_connector"); -fields("post_connector") -> - Fields = override( - emqx_bridge_kafka:fields("post_connector"), + emqx_connector_schema:api_fields( + Field, + ?CONNECTOR_TYPE, + emqx_bridge_kafka:kafka_connector_config_fields() + ), connector_overrides() ), override_documentations(Fields); @@ -170,7 +169,7 @@ struct_names() -> bridge_v2_examples(Method) -> [ #{ - ?AEH_CONNECTOR_TYPE_BIN => #{ + ?CONNECTOR_TYPE_BIN => #{ summary => <<"Azure Event Hub Action">>, value => values({Method, bridge_v2}) } @@ -180,7 +179,7 @@ bridge_v2_examples(Method) -> connector_examples(Method) -> [ #{ - ?AEH_CONNECTOR_TYPE_BIN => #{ + ?CONNECTOR_TYPE_BIN => #{ summary => <<"Azure Event Hub Connector">>, value => values({Method, connector}) } @@ -197,6 +196,20 @@ conn_bridge_examples(Method) -> } ]. +values({get, connector}) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ], + actions => [<<"my_action">>] + }, + values({post, connector}) + ); values({get, AEHType}) -> maps:merge( #{ @@ -217,7 +230,7 @@ values({post, bridge_v2}) -> enable => true, connector => <<"my_azure_event_hub_producer_connector">>, name => <<"my_azure_event_hub_producer_action">>, - type => ?AEH_CONNECTOR_TYPE_BIN + type => ?CONNECTOR_TYPE_BIN } ); values({post, connector}) -> @@ -225,7 +238,7 @@ values({post, connector}) -> values(common_config), #{ name => <<"my_azure_event_hub_producer_connector">>, - type => ?AEH_CONNECTOR_TYPE_BIN, + type => ?CONNECTOR_TYPE_BIN, ssl => #{ enable => true, server_name_indication => <<"auto">>, @@ -358,7 +371,7 @@ connector_overrides() -> } ), type => mk( - ?AEH_CONNECTOR_TYPE, + ?CONNECTOR_TYPE, #{ required => true, desc => ?DESC("connector_type") @@ -414,7 +427,7 @@ bridge_v2_overrides() -> }), ssl => mk(ref(ssl_client_opts), #{default => #{<<"enable">> => true}}), type => mk( - ?AEH_CONNECTOR_TYPE, + ?CONNECTOR_TYPE, #{ required => true, desc => ?DESC("bridge_v2_type") diff --git a/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl b/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl index 8742d7ccf..a43a8a285 100644 --- a/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl +++ b/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl @@ -30,8 +30,8 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). --define(CONFLUENT_CONNECTOR_TYPE, confluent_producer). --define(CONFLUENT_CONNECTOR_TYPE_BIN, <<"confluent_producer">>). +-define(CONNECTOR_TYPE, confluent_producer). +-define(CONNECTOR_TYPE_BIN, <<"confluent_producer">>). %%------------------------------------------------------------------------------------------------- %% `hocon_schema' API @@ -41,18 +41,17 @@ namespace() -> "confluent". roots() -> ["config_producer"]. -fields("put_connector") -> +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> Fields = override( - emqx_bridge_kafka:fields("put_connector"), - connector_overrides() - ), - override_documentations(Fields); -fields("get_connector") -> - emqx_bridge_schema:status_fields() ++ - fields("post_connector"); -fields("post_connector") -> - Fields = override( - emqx_bridge_kafka:fields("post_connector"), + emqx_connector_schema:api_fields( + Field, + ?CONNECTOR_TYPE, + emqx_bridge_kafka:kafka_connector_config_fields() + ), connector_overrides() ), override_documentations(Fields); @@ -155,7 +154,7 @@ struct_names() -> bridge_v2_examples(Method) -> [ #{ - ?CONFLUENT_CONNECTOR_TYPE_BIN => #{ + ?CONNECTOR_TYPE_BIN => #{ summary => <<"Confluent Action">>, value => values({Method, bridge_v2}) } @@ -165,13 +164,27 @@ bridge_v2_examples(Method) -> connector_examples(Method) -> [ #{ - ?CONFLUENT_CONNECTOR_TYPE_BIN => #{ + ?CONNECTOR_TYPE_BIN => #{ summary => <<"Confluent Connector">>, value => values({Method, connector}) } } ]. +values({get, connector}) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ], + actions => [<<"my_action">>] + }, + values({post, connector}) + ); values({get, ConfluentType}) -> maps:merge( #{ @@ -192,7 +205,7 @@ values({post, bridge_v2}) -> enable => true, connector => <<"my_confluent_producer_connector">>, name => <<"my_confluent_producer_action">>, - type => ?CONFLUENT_CONNECTOR_TYPE_BIN + type => ?CONNECTOR_TYPE_BIN } ); values({post, connector}) -> @@ -200,7 +213,7 @@ values({post, connector}) -> values(common_config), #{ name => <<"my_confluent_producer_connector">>, - type => ?CONFLUENT_CONNECTOR_TYPE_BIN, + type => ?CONNECTOR_TYPE_BIN, ssl => #{ enable => true, server_name_indication => <<"auto">>, @@ -320,7 +333,7 @@ connector_overrides() -> } ), type => mk( - ?CONFLUENT_CONNECTOR_TYPE, + ?CONNECTOR_TYPE, #{ required => true, desc => ?DESC("connector_type") @@ -342,7 +355,7 @@ bridge_v2_overrides() -> } }), type => mk( - ?CONFLUENT_CONNECTOR_TYPE, + ?CONNECTOR_TYPE, #{ required => true, desc => ?DESC("bridge_v2_type") diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl index 0ee625824..a4c939d7a 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl @@ -24,6 +24,8 @@ connector_examples/1 ]). +-define(CONNECTOR_TYPE, gcp_pubsub_producer). + %%------------------------------------------------------------------------------------------------- %% `hocon_schema' API %%------------------------------------------------------------------------------------------------- @@ -68,8 +70,7 @@ fields(action_parameters) -> fields("config_connector") -> %% FIXME emqx_connector_schema:common_fields() ++ - emqx_bridge_gcp_pubsub:fields(connector_config) ++ - emqx_resource_schema:fields("resource_opts"); + connector_config_fields(); %%========================================= %% HTTP API fields: action %%========================================= @@ -82,12 +83,16 @@ fields("put_bridge_v2") -> %%========================================= %% HTTP API fields: connector %%========================================= -fields("get_connector") -> - emqx_bridge_schema:status_fields() ++ fields("post_connector"); -fields("post_connector") -> - [type_field(), name_field() | fields("put_connector")]; -fields("put_connector") -> - fields("config_connector"). +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, connector_config_fields()). + +connector_config_fields() -> + emqx_bridge_gcp_pubsub:fields(connector_config) ++ + emqx_resource_schema:fields("resource_opts"). desc("config_connector") -> ?DESC("config_connector"); @@ -177,7 +182,7 @@ action_example(put) -> connector_example(get) -> maps:merge( - connector_example(put), + connector_example(post), #{ status => <<"connected">>, node_status => [ @@ -185,7 +190,8 @@ connector_example(get) -> node => <<"emqx@localhost">>, status => <<"connected">> } - ] + ], + actions => [<<"my_action">>] } ); connector_example(post) -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 93515b5db..28050d368 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -33,10 +33,13 @@ ]). -export([ + kafka_connector_config_fields/0, kafka_producer_converter/2, producer_strategy_key_validator/1 ]). +-define(CONNECTOR_TYPE, kafka_producer). + %% ------------------------------------------------------------------------------------------------- %% api @@ -76,6 +79,20 @@ conn_bridge_examples(Method) -> } ]. +values({get, connector}) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ], + actions => [<<"my_action">>] + }, + values({post, connector}) + ); values({get, KafkaType}) -> maps:merge( #{ @@ -247,6 +264,12 @@ namespace() -> "bridge_kafka". roots() -> ["config_consumer", "config_producer", "config_bridge_v2"]. +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, kafka_connector_config_fields()); fields("post_" ++ Type) -> [type_field(Type), name_field() | fields("config_" ++ Type)]; fields("put_" ++ Type) -> @@ -560,9 +583,11 @@ desc(Name) -> ?DESC(Name). connector_config_fields() -> + emqx_connector_schema:common_fields() ++ + kafka_connector_config_fields(). + +kafka_connector_config_fields() -> [ - {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {description, emqx_schema:description_schema()}, {bootstrap_hosts, mk( binary(), diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl index f74e18d3b..4f7a1a370 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl @@ -22,6 +22,8 @@ connector_examples/1 ]). +-define(CONNECTOR_TYPE, matrix). + %% ------------------------------------------------------------------------------------------------- %% api @@ -60,12 +62,12 @@ fields("get_bridge_v2") -> emqx_bridge_pgsql:fields(pgsql_action); fields("post_bridge_v2") -> emqx_bridge_pgsql:fields(pgsql_action); -fields("put_connector") -> - emqx_bridge_pgsql:fields("config_connector"); -fields("get_connector") -> - emqx_bridge_pgsql:fields("config_connector"); -fields("post_connector") -> - emqx_bridge_pgsql:fields("config_connector"); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_postgresql_connector_schema:fields({Field, ?CONNECTOR_TYPE}); fields(Method) -> emqx_bridge_pgsql:fields(Method). diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl index ac7aa6280..796a4a4d1 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl @@ -25,6 +25,8 @@ desc/1 ]). +-define(CONNECTOR_TYPE, mongodb). + %%================================================================================================= %% hocon_schema API %%================================================================================================= @@ -51,16 +53,18 @@ fields("config") -> ]; fields("config_connector") -> emqx_connector_schema:common_fields() ++ - [ - {parameters, - mk( - hoconsc:union([ - ref(emqx_mongodb, "connector_" ++ T) - || T <- ["single", "sharded", "rs"] - ]), - #{required => true, desc => ?DESC("mongodb_parameters")} - )} - ] ++ emqx_mongodb:fields(mongodb); + fields("connection_fields"); +fields("connection_fields") -> + [ + {parameters, + mk( + hoconsc:union([ + ref(emqx_mongodb, "connector_" ++ T) + || T <- ["single", "sharded", "rs"] + ]), + #{required => true, desc => ?DESC("mongodb_parameters")} + )} + ] ++ emqx_mongodb:fields(mongodb); fields("creation_opts") -> %% so far, mongodb connector does not support batching %% but we cannot delete this field due to compatibility reasons @@ -97,14 +101,12 @@ fields(mongodb_sharded) -> emqx_mongodb:fields(sharded) ++ fields("config"); fields(mongodb_single) -> emqx_mongodb:fields(single) ++ fields("config"); -fields("post_connector") -> - type_and_name_fields(mongodb) ++ - fields("config_connector"); -fields("put_connector") -> - fields("config_connector"); -fields("get_connector") -> - emqx_bridge_schema:status_fields() ++ - fields("post_connector"); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, fields("connection_fields")); fields("get_bridge_v2") -> emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"); @@ -319,7 +321,8 @@ method_values(Type, get) -> node => <<"emqx@localhost">>, status => <<"connected">> } - ] + ], + actions => [<<"my_action">>] } ); method_values(_Type, put) -> diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index c267ee521..49942065a 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -35,6 +35,7 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). +-define(CONNECTOR_TYPE, syskeeper_forwarder). -define(SYSKEEPER_HOST_OPTIONS, #{ default_port => 9092 }). @@ -62,7 +63,8 @@ values(get) -> node => <<"emqx@localhost">>, status => <<"connected">> } - ] + ], + actions => [<<"my_action">>] }, values(post) ); @@ -89,9 +91,9 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. fields(config) -> + emqx_connector_schema:common_fields() ++ fields("connection_fields"); +fields("connection_fields") -> [ - {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {description, emqx_schema:description_schema()}, {server, server()}, {ack_mode, mk( @@ -110,12 +112,14 @@ fields(config) -> emqx_connector_schema_lib:pool_size(Other) end} ]; -fields("post") -> - [type_field(), name_field() | fields(config)]; -fields("put") -> - fields(config); -fields("get") -> - emqx_bridge_schema:status_fields() ++ fields("post"). +fields(Field) when + Field == "get"; + Field == "post"; + Field == "put" +-> + emqx_connector_schema:api_fields( + Field ++ "_connector", ?CONNECTOR_TYPE, fields("connection_fields") + ). desc(config) -> ?DESC("desc_config"); @@ -128,12 +132,6 @@ server() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS). -type_field() -> - {type, mk(enum([syskeeper_forwarder]), #{required => true, desc => ?DESC("desc_type")})}. - -name_field() -> - {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. - %% ------------------------------------------------------------------------------------------------- %% `emqx_resource' API diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl index 1968022c1..f930b0042 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl @@ -22,6 +22,8 @@ desc/1 ]). +-define(CONNECTOR_TYPE, syskeeper_proxy). + -define(SYSKEEPER_HOST_OPTIONS, #{ default_port => 9092 }). @@ -47,7 +49,8 @@ values(get) -> node => <<"emqx@localhost">>, status => <<"connected">> } - ] + ], + actions => [<<"my_action">>] }, values(post) ); @@ -74,9 +77,9 @@ namespace() -> "connector_syskeeper_proxy". roots() -> []. fields(config) -> + emqx_connector_schema:common_fields() ++ fields("connection_fields"); +fields("connection_fields") -> [ - {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {description, emqx_schema:description_schema()}, {listen, listen()}, {acceptors, mk( @@ -89,12 +92,14 @@ fields(config) -> #{desc => ?DESC(handshake_timeout), default => <<"10s">>} )} ]; -fields("post") -> - [type_field(), name_field() | fields(config)]; -fields("put") -> - fields(config); -fields("get") -> - emqx_bridge_schema:status_fields() ++ fields("post"). +fields(Field) when + Field == "get"; + Field == "post"; + Field == "put" +-> + emqx_connector_schema:api_fields( + Field ++ "_connector", ?CONNECTOR_TYPE, fields("connection_fields") + ). desc(config) -> ?DESC("desc_config"); @@ -106,11 +111,3 @@ desc(_) -> listen() -> Meta = #{desc => ?DESC("listen")}, emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS). - -%% ------------------------------------------------------------------------------------------------- - -type_field() -> - {type, mk(enum([syskeeper_proxy]), #{required => true, desc => ?DESC("desc_type")})}. - -name_field() -> - {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl index 796d9d9f6..5d6c5498d 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl @@ -22,6 +22,8 @@ connector_examples/1 ]). +-define(CONNECTOR_TYPE, timescale). + %% ------------------------------------------------------------------------------------------------- %% api @@ -44,7 +46,7 @@ roots() -> []. fields("post") -> emqx_bridge_pgsql:fields("post", timescale); fields("config_connector") -> - emqx_bridge_pgsql:fields("config_connector"); + emqx_postgresql_connector_schema:fields("config_connector"); fields(action) -> {timescale, hoconsc:mk( @@ -60,12 +62,12 @@ fields("get_bridge_v2") -> emqx_bridge_pgsql:fields(pgsql_action); fields("post_bridge_v2") -> emqx_bridge_pgsql:fields(pgsql_action); -fields("put_connector") -> - emqx_bridge_pgsql:fields("config_connector"); -fields("get_connector") -> - emqx_bridge_pgsql:fields("config_connector"); -fields("post_connector") -> - emqx_bridge_pgsql:fields("config_connector"); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_postgresql_connector_schema:fields({Field, ?CONNECTOR_TYPE}); fields(Method) -> emqx_bridge_pgsql:fields(Method). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index f6e0c0f95..d09c67c8a 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -637,15 +637,25 @@ format_resource( ). format_resource_data(ResData) -> - maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], ResData)). + maps:fold(fun format_resource_data/3, #{}, maps:with([status, error, added_channels], ResData)). format_resource_data(error, undefined, Result) -> Result; format_resource_data(error, Error, Result) -> Result#{status_reason => emqx_utils:readable_error_msg(Error)}; +format_resource_data(added_channels, Channels, Result) -> + Result#{actions => lists:map(fun format_action/1, maps:keys(Channels))}; format_resource_data(K, V, Result) -> Result#{K => V}. +format_action(Action) -> + case string:split(Action, ":", all) of + [_Prefix, _Type, Name | _] -> + Name; + _ -> + Action + end. + is_ok(ok) -> ok; is_ok(OkResult = {ok, _}) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index a7de0cf52..d6f8608ae 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -33,7 +33,12 @@ -export([get_response/0, put_request/0, post_request/0]). -export([connector_type_to_bridge_types/1]). --export([common_fields/0]). +-export([ + api_fields/3, + common_fields/0, + status_and_actions_fields/0, + type_and_name_fields/1 +]). -export([resource_opts_fields/0, resource_opts_fields/1]). @@ -352,19 +357,87 @@ roots() -> end. fields(connectors) -> - [] ++ enterprise_fields_connectors(). + [] ++ enterprise_fields_connectors(); +fields("node_status") -> + [ + node_name(), + {"status", mk(status(), #{})}, + {"status_reason", + mk(binary(), #{ + required => false, + desc => ?DESC("desc_status_reason"), + example => <<"Connection refused">> + })} + ]. desc(connectors) -> ?DESC("desc_connectors"); +desc("node_status") -> + ?DESC("desc_node_status"); desc(_) -> undefined. +api_fields("get_connector", Type, Fields) -> + lists:append( + [ + type_and_name_fields(Type), + common_fields(), + status_and_actions_fields(), + Fields + ] + ); +api_fields("post_connector", Type, Fields) -> + lists:append( + [ + type_and_name_fields(Type), + common_fields(), + Fields + ] + ); +api_fields("put_connector", _Type, Fields) -> + lists:append( + [ + common_fields(), + Fields + ] + ). + common_fields() -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {description, emqx_schema:description_schema()} ]. +type_and_name_fields(ConnectorType) -> + [ + {type, mk(ConnectorType, #{required => true, desc => ?DESC("desc_type")})}, + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} + ]. + +status_and_actions_fields() -> + [ + {"status", mk(status(), #{desc => ?DESC("desc_status")})}, + {"status_reason", + mk(binary(), #{ + required => false, + desc => ?DESC("desc_status_reason"), + example => <<"Connection refused">> + })}, + {"node_status", + mk( + hoconsc:array(ref(?MODULE, "node_status")), + #{desc => ?DESC("desc_node_status")} + )}, + {"actions", + mk( + hoconsc:array(binary()), + #{ + desc => ?DESC("connector_actions"), + example => [<<"my_action">>] + } + )} + ]. + resource_opts_fields() -> resource_opts_fields(_Overrides = []). @@ -422,12 +495,18 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> false; _ -> {true, #{ - schema_modle => Module, + schema_module => Module, type_name => TypeName, missing_fields => MissingFileds }} end. +status() -> + hoconsc:enum([connected, disconnected, connecting, inconsistent]). + +node_name() -> + {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. + common_field_names() -> [ enable, description diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index bd8aa9ddf..0b4189396 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -175,7 +175,8 @@ groups() -> AllTCs = emqx_common_test_helpers:all(?MODULE), SingleOnlyTests = [ t_connectors_probe, - t_fail_delete_with_action + t_fail_delete_with_action, + t_actions_field ], ClusterLaterJoinOnlyTCs = [ % t_cluster_later_join_metrics @@ -256,15 +257,6 @@ end_per_testcase(TestCase, Config) -> ok. -define(CONNECTOR_IMPL, dummy_connector_impl). -init_mocks(t_fail_delete_with_action) -> - init_mocks(common), - meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}), - meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}), - meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected), - ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) -> - emqx_bridge_v2:get_channels_for_connector(ResId) - end), - ok; init_mocks(_TestCase) -> meck:new(emqx_connector_ee_schema, [passthrough, no_link]), meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL), @@ -289,17 +281,25 @@ init_mocks(_TestCase) -> (_, _) -> connected end ), + meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}), + meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}), + meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected), + meck:expect( + ?CONNECTOR_IMPL, + on_get_channels, + fun(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId) + end + ), [?CONNECTOR_IMPL, emqx_connector_ee_schema]. -clear_resources(t_fail_delete_with_action) -> +clear_resources(_) -> lists:foreach( fun(#{type := Type, name := Name}) -> ok = emqx_bridge_v2:remove(Type, Name) end, emqx_bridge_v2:list() ), - clear_resources(common); -clear_resources(_) -> lists:foreach( fun(#{type := Type, name := Name}) -> ok = emqx_connector:remove(Type, Name) @@ -738,6 +738,62 @@ t_create_with_bad_name(Config) -> ?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg), ok. +t_actions_field(Config) -> + Name = ?CONNECTOR_NAME, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?CONNECTOR_TYPE, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _], + <<"actions">> := [] + }}, + request_json( + post, + uri(["connectors"]), + ?KAFKA_CONNECTOR(Name), + Config + ) + ), + ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name), + BridgeName = ?BRIDGE_NAME, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := BridgeName, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _], + <<"connector">> := Name, + <<"kafka">> := #{}, + <<"local_topic">> := _, + <<"resource_opts">> := _ + }}, + request_json( + post, + uri(["actions"]), + ?KAFKA_BRIDGE(?BRIDGE_NAME), + Config + ) + ), + ?assertMatch( + {ok, 200, #{ + <<"type">> := ?CONNECTOR_TYPE, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _], + <<"actions">> := [BridgeName] + }}, + request_json( + get, + uri(["connectors", ConnectorID]), + Config + ) + ), + ok. + t_fail_delete_with_action(Config) -> Name = ?CONNECTOR_NAME, ?assertMatch( diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl index 74591beee..94e07ba7a 100644 --- a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -35,6 +35,8 @@ values/1 ]). +-define(CONNECTOR_TYPE, pgsql). + roots() -> []. @@ -64,12 +66,18 @@ fields("get_bridge_v2") -> fields(pgsql_action); fields("post_bridge_v2") -> fields(pgsql_action); -fields("put_connector") -> - fields("config_connector"); -fields("get_connector") -> - fields("config_connector"); -fields("post_connector") -> - fields("config_connector"). +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + fields({Field, ?CONNECTOR_TYPE}); +fields({Field, Type}) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields(Field, Type, fields("connection_fields")). server() -> Meta = #{desc => ?DESC("server")}, @@ -94,7 +102,7 @@ connector_examples(Method) -> #{ <<"pgsql">> => #{ summary => <<"PostgreSQL Connector">>, - value => values({Method, pgsql}) + value => values({Method, <<"pgsql">>}) } } ]. @@ -109,20 +117,21 @@ values({get, PostgreSQLType}) -> node => <<"emqx@localhost">>, status => <<"connected">> } - ] + ], + actions => [<<"my_action">>] }, values({post, PostgreSQLType}) ); values({post, PostgreSQLType}) -> - values({put, PostgreSQLType}); -values({put, PostgreSQLType}) -> maps:merge( #{ - name => <<"my_action">>, + name => <<"my_", PostgreSQLType/binary, "_connector">>, type => PostgreSQLType }, values(common) ); +values({put, _PostgreSQLType}) -> + values(common); values(common) -> #{ <<"database">> => <<"emqx_data">>, diff --git a/rel/i18n/emqx_connector_schema.hocon b/rel/i18n/emqx_connector_schema.hocon index d3aa1c82b..16d153e12 100644 --- a/rel/i18n/emqx_connector_schema.hocon +++ b/rel/i18n/emqx_connector_schema.hocon @@ -10,9 +10,54 @@ connector_field.desc: connector_field.label: """Connector""" +desc_name.desc: +"""The name of the connector.""" + +desc_name.label: +"""Connector Name""" + +desc_type.desc: +"""The type of the connector.""" + +desc_type.label: +"""Connector Type""" + config_enable.desc: """Enable (true) or disable (false) this connector.""" config_enable.label: """Enable or Disable""" +desc_node_name.desc: +"""The node name.""" + +desc_node_name.label: +"""Node Name""" + +desc_node_status.desc: +"""Node status.""" + +desc_node_status.label: +"""Node Status""" + +desc_status.desc: +"""The status of the connector
+- connecting: the initial state before any health probes were made.
+- connected: when the connector passes the health probes.
+- disconnected: when the connector can not pass health probes.
+- inconsistent: When not all the nodes are at the same status.""" + +desc_status.label: +"""Connector Status""" + +desc_status_reason.desc: +"""This is the reason given in case a connector is failing to connect.""" + +desc_status_reason.label: +"""Failure reason""" + +connector_actions.desc: +"""List of actions added to this connector.""" + +connector_actions.label: +"""Actions""" }