diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl
index 25619d99a..188a550fc 100644
--- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl
+++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl
@@ -250,7 +250,7 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
false;
_ ->
{true, #{
- schema_modle => Module,
+ schema_module => Module,
type_name => TypeName,
missing_fields => MissingFileds
}}
diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl
index 569725a34..cf733ddfd 100644
--- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl
+++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl
@@ -31,8 +31,8 @@
-import(hoconsc, [mk/2, enum/1, ref/2]).
--define(AEH_CONNECTOR_TYPE, azure_event_hub_producer).
--define(AEH_CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>).
+-define(CONNECTOR_TYPE, azure_event_hub_producer).
+-define(CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
@@ -42,18 +42,17 @@ namespace() -> "bridge_azure_event_hub".
roots() -> ["config_producer"].
-fields("put_connector") ->
+fields(Field) when
+ Field == "get_connector";
+ Field == "put_connector";
+ Field == "post_connector"
+->
Fields = override(
- emqx_bridge_kafka:fields("put_connector"),
- connector_overrides()
- ),
- override_documentations(Fields);
-fields("get_connector") ->
- emqx_bridge_schema:status_fields() ++
- fields("post_connector");
-fields("post_connector") ->
- Fields = override(
- emqx_bridge_kafka:fields("post_connector"),
+ emqx_connector_schema:api_fields(
+ Field,
+ ?CONNECTOR_TYPE,
+ emqx_bridge_kafka:kafka_connector_config_fields()
+ ),
connector_overrides()
),
override_documentations(Fields);
@@ -170,7 +169,7 @@ struct_names() ->
bridge_v2_examples(Method) ->
[
#{
- ?AEH_CONNECTOR_TYPE_BIN => #{
+ ?CONNECTOR_TYPE_BIN => #{
summary => <<"Azure Event Hub Action">>,
value => values({Method, bridge_v2})
}
@@ -180,7 +179,7 @@ bridge_v2_examples(Method) ->
connector_examples(Method) ->
[
#{
- ?AEH_CONNECTOR_TYPE_BIN => #{
+ ?CONNECTOR_TYPE_BIN => #{
summary => <<"Azure Event Hub Connector">>,
value => values({Method, connector})
}
@@ -197,6 +196,20 @@ conn_bridge_examples(Method) ->
}
].
+values({get, connector}) ->
+ maps:merge(
+ #{
+ status => <<"connected">>,
+ node_status => [
+ #{
+ node => <<"emqx@localhost">>,
+ status => <<"connected">>
+ }
+ ],
+ actions => [<<"my_action">>]
+ },
+ values({post, connector})
+ );
values({get, AEHType}) ->
maps:merge(
#{
@@ -217,7 +230,7 @@ values({post, bridge_v2}) ->
enable => true,
connector => <<"my_azure_event_hub_producer_connector">>,
name => <<"my_azure_event_hub_producer_action">>,
- type => ?AEH_CONNECTOR_TYPE_BIN
+ type => ?CONNECTOR_TYPE_BIN
}
);
values({post, connector}) ->
@@ -225,7 +238,7 @@ values({post, connector}) ->
values(common_config),
#{
name => <<"my_azure_event_hub_producer_connector">>,
- type => ?AEH_CONNECTOR_TYPE_BIN,
+ type => ?CONNECTOR_TYPE_BIN,
ssl => #{
enable => true,
server_name_indication => <<"auto">>,
@@ -358,7 +371,7 @@ connector_overrides() ->
}
),
type => mk(
- ?AEH_CONNECTOR_TYPE,
+ ?CONNECTOR_TYPE,
#{
required => true,
desc => ?DESC("connector_type")
@@ -414,7 +427,7 @@ bridge_v2_overrides() ->
}),
ssl => mk(ref(ssl_client_opts), #{default => #{<<"enable">> => true}}),
type => mk(
- ?AEH_CONNECTOR_TYPE,
+ ?CONNECTOR_TYPE,
#{
required => true,
desc => ?DESC("bridge_v2_type")
diff --git a/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl b/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl
index 8742d7ccf..a43a8a285 100644
--- a/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl
+++ b/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl
@@ -30,8 +30,8 @@
-import(hoconsc, [mk/2, enum/1, ref/2]).
--define(CONFLUENT_CONNECTOR_TYPE, confluent_producer).
--define(CONFLUENT_CONNECTOR_TYPE_BIN, <<"confluent_producer">>).
+-define(CONNECTOR_TYPE, confluent_producer).
+-define(CONNECTOR_TYPE_BIN, <<"confluent_producer">>).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
@@ -41,18 +41,17 @@ namespace() -> "confluent".
roots() -> ["config_producer"].
-fields("put_connector") ->
+fields(Field) when
+ Field == "get_connector";
+ Field == "put_connector";
+ Field == "post_connector"
+->
Fields = override(
- emqx_bridge_kafka:fields("put_connector"),
- connector_overrides()
- ),
- override_documentations(Fields);
-fields("get_connector") ->
- emqx_bridge_schema:status_fields() ++
- fields("post_connector");
-fields("post_connector") ->
- Fields = override(
- emqx_bridge_kafka:fields("post_connector"),
+ emqx_connector_schema:api_fields(
+ Field,
+ ?CONNECTOR_TYPE,
+ emqx_bridge_kafka:kafka_connector_config_fields()
+ ),
connector_overrides()
),
override_documentations(Fields);
@@ -155,7 +154,7 @@ struct_names() ->
bridge_v2_examples(Method) ->
[
#{
- ?CONFLUENT_CONNECTOR_TYPE_BIN => #{
+ ?CONNECTOR_TYPE_BIN => #{
summary => <<"Confluent Action">>,
value => values({Method, bridge_v2})
}
@@ -165,13 +164,27 @@ bridge_v2_examples(Method) ->
connector_examples(Method) ->
[
#{
- ?CONFLUENT_CONNECTOR_TYPE_BIN => #{
+ ?CONNECTOR_TYPE_BIN => #{
summary => <<"Confluent Connector">>,
value => values({Method, connector})
}
}
].
+values({get, connector}) ->
+ maps:merge(
+ #{
+ status => <<"connected">>,
+ node_status => [
+ #{
+ node => <<"emqx@localhost">>,
+ status => <<"connected">>
+ }
+ ],
+ actions => [<<"my_action">>]
+ },
+ values({post, connector})
+ );
values({get, ConfluentType}) ->
maps:merge(
#{
@@ -192,7 +205,7 @@ values({post, bridge_v2}) ->
enable => true,
connector => <<"my_confluent_producer_connector">>,
name => <<"my_confluent_producer_action">>,
- type => ?CONFLUENT_CONNECTOR_TYPE_BIN
+ type => ?CONNECTOR_TYPE_BIN
}
);
values({post, connector}) ->
@@ -200,7 +213,7 @@ values({post, connector}) ->
values(common_config),
#{
name => <<"my_confluent_producer_connector">>,
- type => ?CONFLUENT_CONNECTOR_TYPE_BIN,
+ type => ?CONNECTOR_TYPE_BIN,
ssl => #{
enable => true,
server_name_indication => <<"auto">>,
@@ -320,7 +333,7 @@ connector_overrides() ->
}
),
type => mk(
- ?CONFLUENT_CONNECTOR_TYPE,
+ ?CONNECTOR_TYPE,
#{
required => true,
desc => ?DESC("connector_type")
@@ -342,7 +355,7 @@ bridge_v2_overrides() ->
}
}),
type => mk(
- ?CONFLUENT_CONNECTOR_TYPE,
+ ?CONNECTOR_TYPE,
#{
required => true,
desc => ?DESC("bridge_v2_type")
diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl
index 0ee625824..a4c939d7a 100644
--- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl
+++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl
@@ -24,6 +24,8 @@
connector_examples/1
]).
+-define(CONNECTOR_TYPE, gcp_pubsub_producer).
+
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
@@ -68,8 +70,7 @@ fields(action_parameters) ->
fields("config_connector") ->
%% FIXME
emqx_connector_schema:common_fields() ++
- emqx_bridge_gcp_pubsub:fields(connector_config) ++
- emqx_resource_schema:fields("resource_opts");
+ connector_config_fields();
%%=========================================
%% HTTP API fields: action
%%=========================================
@@ -82,12 +83,16 @@ fields("put_bridge_v2") ->
%%=========================================
%% HTTP API fields: connector
%%=========================================
-fields("get_connector") ->
- emqx_bridge_schema:status_fields() ++ fields("post_connector");
-fields("post_connector") ->
- [type_field(), name_field() | fields("put_connector")];
-fields("put_connector") ->
- fields("config_connector").
+fields(Field) when
+ Field == "get_connector";
+ Field == "put_connector";
+ Field == "post_connector"
+->
+ emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, connector_config_fields()).
+
+connector_config_fields() ->
+ emqx_bridge_gcp_pubsub:fields(connector_config) ++
+ emqx_resource_schema:fields("resource_opts").
desc("config_connector") ->
?DESC("config_connector");
@@ -177,7 +182,7 @@ action_example(put) ->
connector_example(get) ->
maps:merge(
- connector_example(put),
+ connector_example(post),
#{
status => <<"connected">>,
node_status => [
@@ -185,7 +190,8 @@ connector_example(get) ->
node => <<"emqx@localhost">>,
status => <<"connected">>
}
- ]
+ ],
+ actions => [<<"my_action">>]
}
);
connector_example(post) ->
diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
index 93515b5db..28050d368 100644
--- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
+++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
@@ -33,10 +33,13 @@
]).
-export([
+ kafka_connector_config_fields/0,
kafka_producer_converter/2,
producer_strategy_key_validator/1
]).
+-define(CONNECTOR_TYPE, kafka_producer).
+
%% -------------------------------------------------------------------------------------------------
%% api
@@ -76,6 +79,20 @@ conn_bridge_examples(Method) ->
}
].
+values({get, connector}) ->
+ maps:merge(
+ #{
+ status => <<"connected">>,
+ node_status => [
+ #{
+ node => <<"emqx@localhost">>,
+ status => <<"connected">>
+ }
+ ],
+ actions => [<<"my_action">>]
+ },
+ values({post, connector})
+ );
values({get, KafkaType}) ->
maps:merge(
#{
@@ -247,6 +264,12 @@ namespace() -> "bridge_kafka".
roots() -> ["config_consumer", "config_producer", "config_bridge_v2"].
+fields(Field) when
+ Field == "get_connector";
+ Field == "put_connector";
+ Field == "post_connector"
+->
+ emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, kafka_connector_config_fields());
fields("post_" ++ Type) ->
[type_field(Type), name_field() | fields("config_" ++ Type)];
fields("put_" ++ Type) ->
@@ -560,9 +583,11 @@ desc(Name) ->
?DESC(Name).
connector_config_fields() ->
+ emqx_connector_schema:common_fields() ++
+ kafka_connector_config_fields().
+
+kafka_connector_config_fields() ->
[
- {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
- {description, emqx_schema:description_schema()},
{bootstrap_hosts,
mk(
binary(),
diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl
index f74e18d3b..4f7a1a370 100644
--- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl
+++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl
@@ -22,6 +22,8 @@
connector_examples/1
]).
+-define(CONNECTOR_TYPE, matrix).
+
%% -------------------------------------------------------------------------------------------------
%% api
@@ -60,12 +62,12 @@ fields("get_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("post_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
-fields("put_connector") ->
- emqx_bridge_pgsql:fields("config_connector");
-fields("get_connector") ->
- emqx_bridge_pgsql:fields("config_connector");
-fields("post_connector") ->
- emqx_bridge_pgsql:fields("config_connector");
+fields(Field) when
+ Field == "get_connector";
+ Field == "put_connector";
+ Field == "post_connector"
+->
+ emqx_postgresql_connector_schema:fields({Field, ?CONNECTOR_TYPE});
fields(Method) ->
emqx_bridge_pgsql:fields(Method).
diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl
index ac7aa6280..796a4a4d1 100644
--- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl
+++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl
@@ -25,6 +25,8 @@
desc/1
]).
+-define(CONNECTOR_TYPE, mongodb).
+
%%=================================================================================================
%% hocon_schema API
%%=================================================================================================
@@ -51,16 +53,18 @@ fields("config") ->
];
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
- [
- {parameters,
- mk(
- hoconsc:union([
- ref(emqx_mongodb, "connector_" ++ T)
- || T <- ["single", "sharded", "rs"]
- ]),
- #{required => true, desc => ?DESC("mongodb_parameters")}
- )}
- ] ++ emqx_mongodb:fields(mongodb);
+ fields("connection_fields");
+fields("connection_fields") ->
+ [
+ {parameters,
+ mk(
+ hoconsc:union([
+ ref(emqx_mongodb, "connector_" ++ T)
+ || T <- ["single", "sharded", "rs"]
+ ]),
+ #{required => true, desc => ?DESC("mongodb_parameters")}
+ )}
+ ] ++ emqx_mongodb:fields(mongodb);
fields("creation_opts") ->
%% so far, mongodb connector does not support batching
%% but we cannot delete this field due to compatibility reasons
@@ -97,14 +101,12 @@ fields(mongodb_sharded) ->
emqx_mongodb:fields(sharded) ++ fields("config");
fields(mongodb_single) ->
emqx_mongodb:fields(single) ++ fields("config");
-fields("post_connector") ->
- type_and_name_fields(mongodb) ++
- fields("config_connector");
-fields("put_connector") ->
- fields("config_connector");
-fields("get_connector") ->
- emqx_bridge_schema:status_fields() ++
- fields("post_connector");
+fields(Field) when
+ Field == "get_connector";
+ Field == "put_connector";
+ Field == "post_connector"
+->
+ emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, fields("connection_fields"));
fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++
fields("post_bridge_v2");
@@ -319,7 +321,8 @@ method_values(Type, get) ->
node => <<"emqx@localhost">>,
status => <<"connected">>
}
- ]
+ ],
+ actions => [<<"my_action">>]
}
);
method_values(_Type, put) ->
diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl
index c267ee521..49942065a 100644
--- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl
+++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl
@@ -35,6 +35,7 @@
-import(hoconsc, [mk/2, enum/1, ref/2]).
+-define(CONNECTOR_TYPE, syskeeper_forwarder).
-define(SYSKEEPER_HOST_OPTIONS, #{
default_port => 9092
}).
@@ -62,7 +63,8 @@ values(get) ->
node => <<"emqx@localhost">>,
status => <<"connected">>
}
- ]
+ ],
+ actions => [<<"my_action">>]
},
values(post)
);
@@ -89,9 +91,9 @@ roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields(config) ->
+ emqx_connector_schema:common_fields() ++ fields("connection_fields");
+fields("connection_fields") ->
[
- {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
- {description, emqx_schema:description_schema()},
{server, server()},
{ack_mode,
mk(
@@ -110,12 +112,14 @@ fields(config) ->
emqx_connector_schema_lib:pool_size(Other)
end}
];
-fields("post") ->
- [type_field(), name_field() | fields(config)];
-fields("put") ->
- fields(config);
-fields("get") ->
- emqx_bridge_schema:status_fields() ++ fields("post").
+fields(Field) when
+ Field == "get";
+ Field == "post";
+ Field == "put"
+->
+ emqx_connector_schema:api_fields(
+ Field ++ "_connector", ?CONNECTOR_TYPE, fields("connection_fields")
+ ).
desc(config) ->
?DESC("desc_config");
@@ -128,12 +132,6 @@ server() ->
Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS).
-type_field() ->
- {type, mk(enum([syskeeper_forwarder]), #{required => true, desc => ?DESC("desc_type")})}.
-
-name_field() ->
- {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
-
%% -------------------------------------------------------------------------------------------------
%% `emqx_resource' API
diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl
index 1968022c1..f930b0042 100644
--- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl
+++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl
@@ -22,6 +22,8 @@
desc/1
]).
+-define(CONNECTOR_TYPE, syskeeper_proxy).
+
-define(SYSKEEPER_HOST_OPTIONS, #{
default_port => 9092
}).
@@ -47,7 +49,8 @@ values(get) ->
node => <<"emqx@localhost">>,
status => <<"connected">>
}
- ]
+ ],
+ actions => [<<"my_action">>]
},
values(post)
);
@@ -74,9 +77,9 @@ namespace() -> "connector_syskeeper_proxy".
roots() -> [].
fields(config) ->
+ emqx_connector_schema:common_fields() ++ fields("connection_fields");
+fields("connection_fields") ->
[
- {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
- {description, emqx_schema:description_schema()},
{listen, listen()},
{acceptors,
mk(
@@ -89,12 +92,14 @@ fields(config) ->
#{desc => ?DESC(handshake_timeout), default => <<"10s">>}
)}
];
-fields("post") ->
- [type_field(), name_field() | fields(config)];
-fields("put") ->
- fields(config);
-fields("get") ->
- emqx_bridge_schema:status_fields() ++ fields("post").
+fields(Field) when
+ Field == "get";
+ Field == "post";
+ Field == "put"
+->
+ emqx_connector_schema:api_fields(
+ Field ++ "_connector", ?CONNECTOR_TYPE, fields("connection_fields")
+ ).
desc(config) ->
?DESC("desc_config");
@@ -106,11 +111,3 @@ desc(_) ->
listen() ->
Meta = #{desc => ?DESC("listen")},
emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS).
-
-%% -------------------------------------------------------------------------------------------------
-
-type_field() ->
- {type, mk(enum([syskeeper_proxy]), #{required => true, desc => ?DESC("desc_type")})}.
-
-name_field() ->
- {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl
index 796d9d9f6..5d6c5498d 100644
--- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl
+++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.erl
@@ -22,6 +22,8 @@
connector_examples/1
]).
+-define(CONNECTOR_TYPE, timescale).
+
%% -------------------------------------------------------------------------------------------------
%% api
@@ -44,7 +46,7 @@ roots() -> [].
fields("post") ->
emqx_bridge_pgsql:fields("post", timescale);
fields("config_connector") ->
- emqx_bridge_pgsql:fields("config_connector");
+ emqx_postgresql_connector_schema:fields("config_connector");
fields(action) ->
{timescale,
hoconsc:mk(
@@ -60,12 +62,12 @@ fields("get_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("post_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
-fields("put_connector") ->
- emqx_bridge_pgsql:fields("config_connector");
-fields("get_connector") ->
- emqx_bridge_pgsql:fields("config_connector");
-fields("post_connector") ->
- emqx_bridge_pgsql:fields("config_connector");
+fields(Field) when
+ Field == "get_connector";
+ Field == "put_connector";
+ Field == "post_connector"
+->
+ emqx_postgresql_connector_schema:fields({Field, ?CONNECTOR_TYPE});
fields(Method) ->
emqx_bridge_pgsql:fields(Method).
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index f6e0c0f95..d09c67c8a 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -637,15 +637,25 @@ format_resource(
).
format_resource_data(ResData) ->
- maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], ResData)).
+ maps:fold(fun format_resource_data/3, #{}, maps:with([status, error, added_channels], ResData)).
format_resource_data(error, undefined, Result) ->
Result;
format_resource_data(error, Error, Result) ->
Result#{status_reason => emqx_utils:readable_error_msg(Error)};
+format_resource_data(added_channels, Channels, Result) ->
+ Result#{actions => lists:map(fun format_action/1, maps:keys(Channels))};
format_resource_data(K, V, Result) ->
Result#{K => V}.
+format_action(Action) ->
+ case string:split(Action, ":", all) of
+ [_Prefix, _Type, Name | _] ->
+ Name;
+ _ ->
+ Action
+ end.
+
is_ok(ok) ->
ok;
is_ok(OkResult = {ok, _}) ->
diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl
index a7de0cf52..d6f8608ae 100644
--- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl
@@ -33,7 +33,12 @@
-export([get_response/0, put_request/0, post_request/0]).
-export([connector_type_to_bridge_types/1]).
--export([common_fields/0]).
+-export([
+ api_fields/3,
+ common_fields/0,
+ status_and_actions_fields/0,
+ type_and_name_fields/1
+]).
-export([resource_opts_fields/0, resource_opts_fields/1]).
@@ -352,19 +357,87 @@ roots() ->
end.
fields(connectors) ->
- [] ++ enterprise_fields_connectors().
+ [] ++ enterprise_fields_connectors();
+fields("node_status") ->
+ [
+ node_name(),
+ {"status", mk(status(), #{})},
+ {"status_reason",
+ mk(binary(), #{
+ required => false,
+ desc => ?DESC("desc_status_reason"),
+ example => <<"Connection refused">>
+ })}
+ ].
desc(connectors) ->
?DESC("desc_connectors");
+desc("node_status") ->
+ ?DESC("desc_node_status");
desc(_) ->
undefined.
+api_fields("get_connector", Type, Fields) ->
+ lists:append(
+ [
+ type_and_name_fields(Type),
+ common_fields(),
+ status_and_actions_fields(),
+ Fields
+ ]
+ );
+api_fields("post_connector", Type, Fields) ->
+ lists:append(
+ [
+ type_and_name_fields(Type),
+ common_fields(),
+ Fields
+ ]
+ );
+api_fields("put_connector", _Type, Fields) ->
+ lists:append(
+ [
+ common_fields(),
+ Fields
+ ]
+ ).
+
common_fields() ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{description, emqx_schema:description_schema()}
].
+type_and_name_fields(ConnectorType) ->
+ [
+ {type, mk(ConnectorType, #{required => true, desc => ?DESC("desc_type")})},
+ {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
+ ].
+
+status_and_actions_fields() ->
+ [
+ {"status", mk(status(), #{desc => ?DESC("desc_status")})},
+ {"status_reason",
+ mk(binary(), #{
+ required => false,
+ desc => ?DESC("desc_status_reason"),
+ example => <<"Connection refused">>
+ })},
+ {"node_status",
+ mk(
+ hoconsc:array(ref(?MODULE, "node_status")),
+ #{desc => ?DESC("desc_node_status")}
+ )},
+ {"actions",
+ mk(
+ hoconsc:array(binary()),
+ #{
+ desc => ?DESC("connector_actions"),
+ example => [<<"my_action">>]
+ }
+ )}
+ ].
+
resource_opts_fields() ->
resource_opts_fields(_Overrides = []).
@@ -422,12 +495,18 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
false;
_ ->
{true, #{
- schema_modle => Module,
+ schema_module => Module,
type_name => TypeName,
missing_fields => MissingFileds
}}
end.
+status() ->
+ hoconsc:enum([connected, disconnected, connecting, inconsistent]).
+
+node_name() ->
+ {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
+
common_field_names() ->
[
enable, description
diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
index bd8aa9ddf..0b4189396 100644
--- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
+++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
@@ -175,7 +175,8 @@ groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
SingleOnlyTests = [
t_connectors_probe,
- t_fail_delete_with_action
+ t_fail_delete_with_action,
+ t_actions_field
],
ClusterLaterJoinOnlyTCs = [
% t_cluster_later_join_metrics
@@ -256,15 +257,6 @@ end_per_testcase(TestCase, Config) ->
ok.
-define(CONNECTOR_IMPL, dummy_connector_impl).
-init_mocks(t_fail_delete_with_action) ->
- init_mocks(common),
- meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
- meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}),
- meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected),
- ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) ->
- emqx_bridge_v2:get_channels_for_connector(ResId)
- end),
- ok;
init_mocks(_TestCase) ->
meck:new(emqx_connector_ee_schema, [passthrough, no_link]),
meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL),
@@ -289,17 +281,25 @@ init_mocks(_TestCase) ->
(_, _) -> connected
end
),
+ meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
+ meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}),
+ meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected),
+ meck:expect(
+ ?CONNECTOR_IMPL,
+ on_get_channels,
+ fun(ResId) ->
+ emqx_bridge_v2:get_channels_for_connector(ResId)
+ end
+ ),
[?CONNECTOR_IMPL, emqx_connector_ee_schema].
-clear_resources(t_fail_delete_with_action) ->
+clear_resources(_) ->
lists:foreach(
fun(#{type := Type, name := Name}) ->
ok = emqx_bridge_v2:remove(Type, Name)
end,
emqx_bridge_v2:list()
),
- clear_resources(common);
-clear_resources(_) ->
lists:foreach(
fun(#{type := Type, name := Name}) ->
ok = emqx_connector:remove(Type, Name)
@@ -738,6 +738,62 @@ t_create_with_bad_name(Config) ->
?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg),
ok.
+t_actions_field(Config) ->
+ Name = ?CONNECTOR_NAME,
+ ?assertMatch(
+ {ok, 201, #{
+ <<"type">> := ?CONNECTOR_TYPE,
+ <<"name">> := Name,
+ <<"enable">> := true,
+ <<"status">> := <<"connected">>,
+ <<"node_status">> := [_ | _],
+ <<"actions">> := []
+ }},
+ request_json(
+ post,
+ uri(["connectors"]),
+ ?KAFKA_CONNECTOR(Name),
+ Config
+ )
+ ),
+ ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name),
+ BridgeName = ?BRIDGE_NAME,
+ ?assertMatch(
+ {ok, 201, #{
+ <<"type">> := ?BRIDGE_TYPE,
+ <<"name">> := BridgeName,
+ <<"enable">> := true,
+ <<"status">> := <<"connected">>,
+ <<"node_status">> := [_ | _],
+ <<"connector">> := Name,
+ <<"kafka">> := #{},
+ <<"local_topic">> := _,
+ <<"resource_opts">> := _
+ }},
+ request_json(
+ post,
+ uri(["actions"]),
+ ?KAFKA_BRIDGE(?BRIDGE_NAME),
+ Config
+ )
+ ),
+ ?assertMatch(
+ {ok, 200, #{
+ <<"type">> := ?CONNECTOR_TYPE,
+ <<"name">> := Name,
+ <<"enable">> := true,
+ <<"status">> := <<"connected">>,
+ <<"node_status">> := [_ | _],
+ <<"actions">> := [BridgeName]
+ }},
+ request_json(
+ get,
+ uri(["connectors", ConnectorID]),
+ Config
+ )
+ ),
+ ok.
+
t_fail_delete_with_action(Config) ->
Name = ?CONNECTOR_NAME,
?assertMatch(
diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl
index 74591beee..94e07ba7a 100644
--- a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl
+++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl
@@ -35,6 +35,8 @@
values/1
]).
+-define(CONNECTOR_TYPE, pgsql).
+
roots() ->
[].
@@ -64,12 +66,18 @@ fields("get_bridge_v2") ->
fields(pgsql_action);
fields("post_bridge_v2") ->
fields(pgsql_action);
-fields("put_connector") ->
- fields("config_connector");
-fields("get_connector") ->
- fields("config_connector");
-fields("post_connector") ->
- fields("config_connector").
+fields(Field) when
+ Field == "get_connector";
+ Field == "put_connector";
+ Field == "post_connector"
+->
+ fields({Field, ?CONNECTOR_TYPE});
+fields({Field, Type}) when
+ Field == "get_connector";
+ Field == "put_connector";
+ Field == "post_connector"
+->
+ emqx_connector_schema:api_fields(Field, Type, fields("connection_fields")).
server() ->
Meta = #{desc => ?DESC("server")},
@@ -94,7 +102,7 @@ connector_examples(Method) ->
#{
<<"pgsql">> => #{
summary => <<"PostgreSQL Connector">>,
- value => values({Method, pgsql})
+ value => values({Method, <<"pgsql">>})
}
}
].
@@ -109,20 +117,21 @@ values({get, PostgreSQLType}) ->
node => <<"emqx@localhost">>,
status => <<"connected">>
}
- ]
+ ],
+ actions => [<<"my_action">>]
},
values({post, PostgreSQLType})
);
values({post, PostgreSQLType}) ->
- values({put, PostgreSQLType});
-values({put, PostgreSQLType}) ->
maps:merge(
#{
- name => <<"my_action">>,
+ name => <<"my_", PostgreSQLType/binary, "_connector">>,
type => PostgreSQLType
},
values(common)
);
+values({put, _PostgreSQLType}) ->
+ values(common);
values(common) ->
#{
<<"database">> => <<"emqx_data">>,
diff --git a/rel/i18n/emqx_connector_schema.hocon b/rel/i18n/emqx_connector_schema.hocon
index d3aa1c82b..16d153e12 100644
--- a/rel/i18n/emqx_connector_schema.hocon
+++ b/rel/i18n/emqx_connector_schema.hocon
@@ -10,9 +10,54 @@ connector_field.desc:
connector_field.label:
"""Connector"""
+desc_name.desc:
+"""The name of the connector."""
+
+desc_name.label:
+"""Connector Name"""
+
+desc_type.desc:
+"""The type of the connector."""
+
+desc_type.label:
+"""Connector Type"""
+
config_enable.desc:
"""Enable (true) or disable (false) this connector."""
config_enable.label:
"""Enable or Disable"""
+desc_node_name.desc:
+"""The node name."""
+
+desc_node_name.label:
+"""Node Name"""
+
+desc_node_status.desc:
+"""Node status."""
+
+desc_node_status.label:
+"""Node Status"""
+
+desc_status.desc:
+"""The status of the connector
+- connecting
: the initial state before any health probes were made.
+- connected
: when the connector passes the health probes.
+- disconnected
: when the connector can not pass health probes.
+- inconsistent
: When not all the nodes are at the same status."""
+
+desc_status.label:
+"""Connector Status"""
+
+desc_status_reason.desc:
+"""This is the reason given in case a connector is failing to connect."""
+
+desc_status_reason.label:
+"""Failure reason"""
+
+connector_actions.desc:
+"""List of actions added to this connector."""
+
+connector_actions.label:
+"""Actions"""
}