feat(emqx_connector): add field 'actions' in API response

Also unify schemas, use emqx_connector_schema for the generic parts.
This commit is contained in:
Stefan Strigler 2023-11-27 16:09:17 +01:00
parent b5a00ec6b2
commit 048f4724a9
15 changed files with 401 additions and 143 deletions

View File

@ -250,7 +250,7 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
false; false;
_ -> _ ->
{true, #{ {true, #{
schema_modle => Module, schema_module => Module,
type_name => TypeName, type_name => TypeName,
missing_fields => MissingFileds missing_fields => MissingFileds
}} }}

View File

@ -31,8 +31,8 @@
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
-define(AEH_CONNECTOR_TYPE, azure_event_hub_producer). -define(CONNECTOR_TYPE, azure_event_hub_producer).
-define(AEH_CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>). -define(CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>).
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API %% `hocon_schema' API
@ -42,18 +42,17 @@ namespace() -> "bridge_azure_event_hub".
roots() -> ["config_producer"]. roots() -> ["config_producer"].
fields("put_connector") -> fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
Fields = override( Fields = override(
emqx_bridge_kafka:fields("put_connector"), emqx_connector_schema:api_fields(
connector_overrides() Field,
), ?CONNECTOR_TYPE,
override_documentations(Fields); emqx_bridge_kafka:kafka_connector_config_fields()
fields("get_connector") -> ),
emqx_bridge_schema:status_fields() ++
fields("post_connector");
fields("post_connector") ->
Fields = override(
emqx_bridge_kafka:fields("post_connector"),
connector_overrides() connector_overrides()
), ),
override_documentations(Fields); override_documentations(Fields);
@ -170,7 +169,7 @@ struct_names() ->
bridge_v2_examples(Method) -> bridge_v2_examples(Method) ->
[ [
#{ #{
?AEH_CONNECTOR_TYPE_BIN => #{ ?CONNECTOR_TYPE_BIN => #{
summary => <<"Azure Event Hub Action">>, summary => <<"Azure Event Hub Action">>,
value => values({Method, bridge_v2}) value => values({Method, bridge_v2})
} }
@ -180,7 +179,7 @@ bridge_v2_examples(Method) ->
connector_examples(Method) -> connector_examples(Method) ->
[ [
#{ #{
?AEH_CONNECTOR_TYPE_BIN => #{ ?CONNECTOR_TYPE_BIN => #{
summary => <<"Azure Event Hub Connector">>, summary => <<"Azure Event Hub Connector">>,
value => values({Method, connector}) value => values({Method, connector})
} }
@ -197,6 +196,20 @@ conn_bridge_examples(Method) ->
} }
]. ].
values({get, connector}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
],
actions => [<<"my_action">>]
},
values({post, connector})
);
values({get, AEHType}) -> values({get, AEHType}) ->
maps:merge( maps:merge(
#{ #{
@ -217,7 +230,7 @@ values({post, bridge_v2}) ->
enable => true, enable => true,
connector => <<"my_azure_event_hub_producer_connector">>, connector => <<"my_azure_event_hub_producer_connector">>,
name => <<"my_azure_event_hub_producer_action">>, name => <<"my_azure_event_hub_producer_action">>,
type => ?AEH_CONNECTOR_TYPE_BIN type => ?CONNECTOR_TYPE_BIN
} }
); );
values({post, connector}) -> values({post, connector}) ->
@ -225,7 +238,7 @@ values({post, connector}) ->
values(common_config), values(common_config),
#{ #{
name => <<"my_azure_event_hub_producer_connector">>, name => <<"my_azure_event_hub_producer_connector">>,
type => ?AEH_CONNECTOR_TYPE_BIN, type => ?CONNECTOR_TYPE_BIN,
ssl => #{ ssl => #{
enable => true, enable => true,
server_name_indication => <<"auto">>, server_name_indication => <<"auto">>,
@ -358,7 +371,7 @@ connector_overrides() ->
} }
), ),
type => mk( type => mk(
?AEH_CONNECTOR_TYPE, ?CONNECTOR_TYPE,
#{ #{
required => true, required => true,
desc => ?DESC("connector_type") desc => ?DESC("connector_type")
@ -414,7 +427,7 @@ bridge_v2_overrides() ->
}), }),
ssl => mk(ref(ssl_client_opts), #{default => #{<<"enable">> => true}}), ssl => mk(ref(ssl_client_opts), #{default => #{<<"enable">> => true}}),
type => mk( type => mk(
?AEH_CONNECTOR_TYPE, ?CONNECTOR_TYPE,
#{ #{
required => true, required => true,
desc => ?DESC("bridge_v2_type") desc => ?DESC("bridge_v2_type")

View File

@ -30,8 +30,8 @@
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
-define(CONFLUENT_CONNECTOR_TYPE, confluent_producer). -define(CONNECTOR_TYPE, confluent_producer).
-define(CONFLUENT_CONNECTOR_TYPE_BIN, <<"confluent_producer">>). -define(CONNECTOR_TYPE_BIN, <<"confluent_producer">>).
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API %% `hocon_schema' API
@ -41,18 +41,17 @@ namespace() -> "confluent".
roots() -> ["config_producer"]. roots() -> ["config_producer"].
fields("put_connector") -> fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
Fields = override( Fields = override(
emqx_bridge_kafka:fields("put_connector"), emqx_connector_schema:api_fields(
connector_overrides() Field,
), ?CONNECTOR_TYPE,
override_documentations(Fields); emqx_bridge_kafka:kafka_connector_config_fields()
fields("get_connector") -> ),
emqx_bridge_schema:status_fields() ++
fields("post_connector");
fields("post_connector") ->
Fields = override(
emqx_bridge_kafka:fields("post_connector"),
connector_overrides() connector_overrides()
), ),
override_documentations(Fields); override_documentations(Fields);
@ -155,7 +154,7 @@ struct_names() ->
bridge_v2_examples(Method) -> bridge_v2_examples(Method) ->
[ [
#{ #{
?CONFLUENT_CONNECTOR_TYPE_BIN => #{ ?CONNECTOR_TYPE_BIN => #{
summary => <<"Confluent Action">>, summary => <<"Confluent Action">>,
value => values({Method, bridge_v2}) value => values({Method, bridge_v2})
} }
@ -165,13 +164,27 @@ bridge_v2_examples(Method) ->
connector_examples(Method) -> connector_examples(Method) ->
[ [
#{ #{
?CONFLUENT_CONNECTOR_TYPE_BIN => #{ ?CONNECTOR_TYPE_BIN => #{
summary => <<"Confluent Connector">>, summary => <<"Confluent Connector">>,
value => values({Method, connector}) value => values({Method, connector})
} }
} }
]. ].
values({get, connector}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
],
actions => [<<"my_action">>]
},
values({post, connector})
);
values({get, ConfluentType}) -> values({get, ConfluentType}) ->
maps:merge( maps:merge(
#{ #{
@ -192,7 +205,7 @@ values({post, bridge_v2}) ->
enable => true, enable => true,
connector => <<"my_confluent_producer_connector">>, connector => <<"my_confluent_producer_connector">>,
name => <<"my_confluent_producer_action">>, name => <<"my_confluent_producer_action">>,
type => ?CONFLUENT_CONNECTOR_TYPE_BIN type => ?CONNECTOR_TYPE_BIN
} }
); );
values({post, connector}) -> values({post, connector}) ->
@ -200,7 +213,7 @@ values({post, connector}) ->
values(common_config), values(common_config),
#{ #{
name => <<"my_confluent_producer_connector">>, name => <<"my_confluent_producer_connector">>,
type => ?CONFLUENT_CONNECTOR_TYPE_BIN, type => ?CONNECTOR_TYPE_BIN,
ssl => #{ ssl => #{
enable => true, enable => true,
server_name_indication => <<"auto">>, server_name_indication => <<"auto">>,
@ -320,7 +333,7 @@ connector_overrides() ->
} }
), ),
type => mk( type => mk(
?CONFLUENT_CONNECTOR_TYPE, ?CONNECTOR_TYPE,
#{ #{
required => true, required => true,
desc => ?DESC("connector_type") desc => ?DESC("connector_type")
@ -342,7 +355,7 @@ bridge_v2_overrides() ->
} }
}), }),
type => mk( type => mk(
?CONFLUENT_CONNECTOR_TYPE, ?CONNECTOR_TYPE,
#{ #{
required => true, required => true,
desc => ?DESC("bridge_v2_type") desc => ?DESC("bridge_v2_type")

View File

@ -24,6 +24,8 @@
connector_examples/1 connector_examples/1
]). ]).
-define(CONNECTOR_TYPE, gcp_pubsub_producer).
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API %% `hocon_schema' API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
@ -68,8 +70,7 @@ fields(action_parameters) ->
fields("config_connector") -> fields("config_connector") ->
%% FIXME %% FIXME
emqx_connector_schema:common_fields() ++ emqx_connector_schema:common_fields() ++
emqx_bridge_gcp_pubsub:fields(connector_config) ++ connector_config_fields();
emqx_resource_schema:fields("resource_opts");
%%========================================= %%=========================================
%% HTTP API fields: action %% HTTP API fields: action
%%========================================= %%=========================================
@ -82,12 +83,16 @@ fields("put_bridge_v2") ->
%%========================================= %%=========================================
%% HTTP API fields: connector %% HTTP API fields: connector
%%========================================= %%=========================================
fields("get_connector") -> fields(Field) when
emqx_bridge_schema:status_fields() ++ fields("post_connector"); Field == "get_connector";
fields("post_connector") -> Field == "put_connector";
[type_field(), name_field() | fields("put_connector")]; Field == "post_connector"
fields("put_connector") -> ->
fields("config_connector"). emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, connector_config_fields()).
connector_config_fields() ->
emqx_bridge_gcp_pubsub:fields(connector_config) ++
emqx_resource_schema:fields("resource_opts").
desc("config_connector") -> desc("config_connector") ->
?DESC("config_connector"); ?DESC("config_connector");
@ -177,7 +182,7 @@ action_example(put) ->
connector_example(get) -> connector_example(get) ->
maps:merge( maps:merge(
connector_example(put), connector_example(post),
#{ #{
status => <<"connected">>, status => <<"connected">>,
node_status => [ node_status => [
@ -185,7 +190,8 @@ connector_example(get) ->
node => <<"emqx@localhost">>, node => <<"emqx@localhost">>,
status => <<"connected">> status => <<"connected">>
} }
] ],
actions => [<<"my_action">>]
} }
); );
connector_example(post) -> connector_example(post) ->

View File

@ -33,10 +33,13 @@
]). ]).
-export([ -export([
kafka_connector_config_fields/0,
kafka_producer_converter/2, kafka_producer_converter/2,
producer_strategy_key_validator/1 producer_strategy_key_validator/1
]). ]).
-define(CONNECTOR_TYPE, kafka_producer).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% api %% api
@ -76,6 +79,20 @@ conn_bridge_examples(Method) ->
} }
]. ].
values({get, connector}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
],
actions => [<<"my_action">>]
},
values({post, connector})
);
values({get, KafkaType}) -> values({get, KafkaType}) ->
maps:merge( maps:merge(
#{ #{
@ -247,6 +264,12 @@ namespace() -> "bridge_kafka".
roots() -> ["config_consumer", "config_producer", "config_bridge_v2"]. roots() -> ["config_consumer", "config_producer", "config_bridge_v2"].
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, kafka_connector_config_fields());
fields("post_" ++ Type) -> fields("post_" ++ Type) ->
[type_field(Type), name_field() | fields("config_" ++ Type)]; [type_field(Type), name_field() | fields("config_" ++ Type)];
fields("put_" ++ Type) -> fields("put_" ++ Type) ->
@ -560,9 +583,11 @@ desc(Name) ->
?DESC(Name). ?DESC(Name).
connector_config_fields() -> connector_config_fields() ->
emqx_connector_schema:common_fields() ++
kafka_connector_config_fields().
kafka_connector_config_fields() ->
[ [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{description, emqx_schema:description_schema()},
{bootstrap_hosts, {bootstrap_hosts,
mk( mk(
binary(), binary(),

View File

@ -22,6 +22,8 @@
connector_examples/1 connector_examples/1
]). ]).
-define(CONNECTOR_TYPE, matrix).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% api %% api
@ -60,12 +62,12 @@ fields("get_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action); emqx_bridge_pgsql:fields(pgsql_action);
fields("post_bridge_v2") -> fields("post_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action); emqx_bridge_pgsql:fields(pgsql_action);
fields("put_connector") -> fields(Field) when
emqx_bridge_pgsql:fields("config_connector"); Field == "get_connector";
fields("get_connector") -> Field == "put_connector";
emqx_bridge_pgsql:fields("config_connector"); Field == "post_connector"
fields("post_connector") -> ->
emqx_bridge_pgsql:fields("config_connector"); emqx_postgresql_connector_schema:fields({Field, ?CONNECTOR_TYPE});
fields(Method) -> fields(Method) ->
emqx_bridge_pgsql:fields(Method). emqx_bridge_pgsql:fields(Method).

View File

@ -25,6 +25,8 @@
desc/1 desc/1
]). ]).
-define(CONNECTOR_TYPE, mongodb).
%%================================================================================================= %%=================================================================================================
%% hocon_schema API %% hocon_schema API
%%================================================================================================= %%=================================================================================================
@ -51,16 +53,18 @@ fields("config") ->
]; ];
fields("config_connector") -> fields("config_connector") ->
emqx_connector_schema:common_fields() ++ emqx_connector_schema:common_fields() ++
[ fields("connection_fields");
{parameters, fields("connection_fields") ->
mk( [
hoconsc:union([ {parameters,
ref(emqx_mongodb, "connector_" ++ T) mk(
|| T <- ["single", "sharded", "rs"] hoconsc:union([
]), ref(emqx_mongodb, "connector_" ++ T)
#{required => true, desc => ?DESC("mongodb_parameters")} || T <- ["single", "sharded", "rs"]
)} ]),
] ++ emqx_mongodb:fields(mongodb); #{required => true, desc => ?DESC("mongodb_parameters")}
)}
] ++ emqx_mongodb:fields(mongodb);
fields("creation_opts") -> fields("creation_opts") ->
%% so far, mongodb connector does not support batching %% so far, mongodb connector does not support batching
%% but we cannot delete this field due to compatibility reasons %% but we cannot delete this field due to compatibility reasons
@ -97,14 +101,12 @@ fields(mongodb_sharded) ->
emqx_mongodb:fields(sharded) ++ fields("config"); emqx_mongodb:fields(sharded) ++ fields("config");
fields(mongodb_single) -> fields(mongodb_single) ->
emqx_mongodb:fields(single) ++ fields("config"); emqx_mongodb:fields(single) ++ fields("config");
fields("post_connector") -> fields(Field) when
type_and_name_fields(mongodb) ++ Field == "get_connector";
fields("config_connector"); Field == "put_connector";
fields("put_connector") -> Field == "post_connector"
fields("config_connector"); ->
fields("get_connector") -> emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, fields("connection_fields"));
emqx_bridge_schema:status_fields() ++
fields("post_connector");
fields("get_bridge_v2") -> fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++ emqx_bridge_schema:status_fields() ++
fields("post_bridge_v2"); fields("post_bridge_v2");
@ -319,7 +321,8 @@ method_values(Type, get) ->
node => <<"emqx@localhost">>, node => <<"emqx@localhost">>,
status => <<"connected">> status => <<"connected">>
} }
] ],
actions => [<<"my_action">>]
} }
); );
method_values(_Type, put) -> method_values(_Type, put) ->

View File

@ -35,6 +35,7 @@
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
-define(CONNECTOR_TYPE, syskeeper_forwarder).
-define(SYSKEEPER_HOST_OPTIONS, #{ -define(SYSKEEPER_HOST_OPTIONS, #{
default_port => 9092 default_port => 9092
}). }).
@ -62,7 +63,8 @@ values(get) ->
node => <<"emqx@localhost">>, node => <<"emqx@localhost">>,
status => <<"connected">> status => <<"connected">>
} }
] ],
actions => [<<"my_action">>]
}, },
values(post) values(post)
); );
@ -89,9 +91,9 @@ roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}]. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields(config) -> fields(config) ->
emqx_connector_schema:common_fields() ++ fields("connection_fields");
fields("connection_fields") ->
[ [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{description, emqx_schema:description_schema()},
{server, server()}, {server, server()},
{ack_mode, {ack_mode,
mk( mk(
@ -110,12 +112,14 @@ fields(config) ->
emqx_connector_schema_lib:pool_size(Other) emqx_connector_schema_lib:pool_size(Other)
end} end}
]; ];
fields("post") -> fields(Field) when
[type_field(), name_field() | fields(config)]; Field == "get";
fields("put") -> Field == "post";
fields(config); Field == "put"
fields("get") -> ->
emqx_bridge_schema:status_fields() ++ fields("post"). emqx_connector_schema:api_fields(
Field ++ "_connector", ?CONNECTOR_TYPE, fields("connection_fields")
).
desc(config) -> desc(config) ->
?DESC("desc_config"); ?DESC("desc_config");
@ -128,12 +132,6 @@ server() ->
Meta = #{desc => ?DESC("server")}, Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS). emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS).
type_field() ->
{type, mk(enum([syskeeper_forwarder]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API

View File

@ -22,6 +22,8 @@
desc/1 desc/1
]). ]).
-define(CONNECTOR_TYPE, syskeeper_proxy).
-define(SYSKEEPER_HOST_OPTIONS, #{ -define(SYSKEEPER_HOST_OPTIONS, #{
default_port => 9092 default_port => 9092
}). }).
@ -47,7 +49,8 @@ values(get) ->
node => <<"emqx@localhost">>, node => <<"emqx@localhost">>,
status => <<"connected">> status => <<"connected">>
} }
] ],
actions => [<<"my_action">>]
}, },
values(post) values(post)
); );
@ -74,9 +77,9 @@ namespace() -> "connector_syskeeper_proxy".
roots() -> []. roots() -> [].
fields(config) -> fields(config) ->
emqx_connector_schema:common_fields() ++ fields("connection_fields");
fields("connection_fields") ->
[ [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{description, emqx_schema:description_schema()},
{listen, listen()}, {listen, listen()},
{acceptors, {acceptors,
mk( mk(
@ -89,12 +92,14 @@ fields(config) ->
#{desc => ?DESC(handshake_timeout), default => <<"10s">>} #{desc => ?DESC(handshake_timeout), default => <<"10s">>}
)} )}
]; ];
fields("post") -> fields(Field) when
[type_field(), name_field() | fields(config)]; Field == "get";
fields("put") -> Field == "post";
fields(config); Field == "put"
fields("get") -> ->
emqx_bridge_schema:status_fields() ++ fields("post"). emqx_connector_schema:api_fields(
Field ++ "_connector", ?CONNECTOR_TYPE, fields("connection_fields")
).
desc(config) -> desc(config) ->
?DESC("desc_config"); ?DESC("desc_config");
@ -106,11 +111,3 @@ desc(_) ->
listen() -> listen() ->
Meta = #{desc => ?DESC("listen")}, Meta = #{desc => ?DESC("listen")},
emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS). emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS).
%% -------------------------------------------------------------------------------------------------
type_field() ->
{type, mk(enum([syskeeper_proxy]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

View File

@ -22,6 +22,8 @@
connector_examples/1 connector_examples/1
]). ]).
-define(CONNECTOR_TYPE, timescale).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% api %% api
@ -44,7 +46,7 @@ roots() -> [].
fields("post") -> fields("post") ->
emqx_bridge_pgsql:fields("post", timescale); emqx_bridge_pgsql:fields("post", timescale);
fields("config_connector") -> fields("config_connector") ->
emqx_bridge_pgsql:fields("config_connector"); emqx_postgresql_connector_schema:fields("config_connector");
fields(action) -> fields(action) ->
{timescale, {timescale,
hoconsc:mk( hoconsc:mk(
@ -60,12 +62,12 @@ fields("get_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action); emqx_bridge_pgsql:fields(pgsql_action);
fields("post_bridge_v2") -> fields("post_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action); emqx_bridge_pgsql:fields(pgsql_action);
fields("put_connector") -> fields(Field) when
emqx_bridge_pgsql:fields("config_connector"); Field == "get_connector";
fields("get_connector") -> Field == "put_connector";
emqx_bridge_pgsql:fields("config_connector"); Field == "post_connector"
fields("post_connector") -> ->
emqx_bridge_pgsql:fields("config_connector"); emqx_postgresql_connector_schema:fields({Field, ?CONNECTOR_TYPE});
fields(Method) -> fields(Method) ->
emqx_bridge_pgsql:fields(Method). emqx_bridge_pgsql:fields(Method).

View File

@ -637,15 +637,25 @@ format_resource(
). ).
format_resource_data(ResData) -> format_resource_data(ResData) ->
maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], ResData)). maps:fold(fun format_resource_data/3, #{}, maps:with([status, error, added_channels], ResData)).
format_resource_data(error, undefined, Result) -> format_resource_data(error, undefined, Result) ->
Result; Result;
format_resource_data(error, Error, Result) -> format_resource_data(error, Error, Result) ->
Result#{status_reason => emqx_utils:readable_error_msg(Error)}; Result#{status_reason => emqx_utils:readable_error_msg(Error)};
format_resource_data(added_channels, Channels, Result) ->
Result#{actions => lists:map(fun format_action/1, maps:keys(Channels))};
format_resource_data(K, V, Result) -> format_resource_data(K, V, Result) ->
Result#{K => V}. Result#{K => V}.
format_action(Action) ->
case string:split(Action, ":", all) of
[_Prefix, _Type, Name | _] ->
Name;
_ ->
Action
end.
is_ok(ok) -> is_ok(ok) ->
ok; ok;
is_ok(OkResult = {ok, _}) -> is_ok(OkResult = {ok, _}) ->

View File

@ -33,7 +33,12 @@
-export([get_response/0, put_request/0, post_request/0]). -export([get_response/0, put_request/0, post_request/0]).
-export([connector_type_to_bridge_types/1]). -export([connector_type_to_bridge_types/1]).
-export([common_fields/0]). -export([
api_fields/3,
common_fields/0,
status_and_actions_fields/0,
type_and_name_fields/1
]).
-export([resource_opts_fields/0, resource_opts_fields/1]). -export([resource_opts_fields/0, resource_opts_fields/1]).
@ -352,19 +357,87 @@ roots() ->
end. end.
fields(connectors) -> fields(connectors) ->
[] ++ enterprise_fields_connectors(). [] ++ enterprise_fields_connectors();
fields("node_status") ->
[
node_name(),
{"status", mk(status(), #{})},
{"status_reason",
mk(binary(), #{
required => false,
desc => ?DESC("desc_status_reason"),
example => <<"Connection refused">>
})}
].
desc(connectors) -> desc(connectors) ->
?DESC("desc_connectors"); ?DESC("desc_connectors");
desc("node_status") ->
?DESC("desc_node_status");
desc(_) -> desc(_) ->
undefined. undefined.
api_fields("get_connector", Type, Fields) ->
lists:append(
[
type_and_name_fields(Type),
common_fields(),
status_and_actions_fields(),
Fields
]
);
api_fields("post_connector", Type, Fields) ->
lists:append(
[
type_and_name_fields(Type),
common_fields(),
Fields
]
);
api_fields("put_connector", _Type, Fields) ->
lists:append(
[
common_fields(),
Fields
]
).
common_fields() -> common_fields() ->
[ [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{description, emqx_schema:description_schema()} {description, emqx_schema:description_schema()}
]. ].
type_and_name_fields(ConnectorType) ->
[
{type, mk(ConnectorType, #{required => true, desc => ?DESC("desc_type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
].
status_and_actions_fields() ->
[
{"status", mk(status(), #{desc => ?DESC("desc_status")})},
{"status_reason",
mk(binary(), #{
required => false,
desc => ?DESC("desc_status_reason"),
example => <<"Connection refused">>
})},
{"node_status",
mk(
hoconsc:array(ref(?MODULE, "node_status")),
#{desc => ?DESC("desc_node_status")}
)},
{"actions",
mk(
hoconsc:array(binary()),
#{
desc => ?DESC("connector_actions"),
example => [<<"my_action">>]
}
)}
].
resource_opts_fields() -> resource_opts_fields() ->
resource_opts_fields(_Overrides = []). resource_opts_fields(_Overrides = []).
@ -422,12 +495,18 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
false; false;
_ -> _ ->
{true, #{ {true, #{
schema_modle => Module, schema_module => Module,
type_name => TypeName, type_name => TypeName,
missing_fields => MissingFileds missing_fields => MissingFileds
}} }}
end. end.
status() ->
hoconsc:enum([connected, disconnected, connecting, inconsistent]).
node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
common_field_names() -> common_field_names() ->
[ [
enable, description enable, description

View File

@ -175,7 +175,8 @@ groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE), AllTCs = emqx_common_test_helpers:all(?MODULE),
SingleOnlyTests = [ SingleOnlyTests = [
t_connectors_probe, t_connectors_probe,
t_fail_delete_with_action t_fail_delete_with_action,
t_actions_field
], ],
ClusterLaterJoinOnlyTCs = [ ClusterLaterJoinOnlyTCs = [
% t_cluster_later_join_metrics % t_cluster_later_join_metrics
@ -256,15 +257,6 @@ end_per_testcase(TestCase, Config) ->
ok. ok.
-define(CONNECTOR_IMPL, dummy_connector_impl). -define(CONNECTOR_IMPL, dummy_connector_impl).
init_mocks(t_fail_delete_with_action) ->
init_mocks(common),
meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}),
meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected),
ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId)
end),
ok;
init_mocks(_TestCase) -> init_mocks(_TestCase) ->
meck:new(emqx_connector_ee_schema, [passthrough, no_link]), meck:new(emqx_connector_ee_schema, [passthrough, no_link]),
meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL), meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL),
@ -289,17 +281,25 @@ init_mocks(_TestCase) ->
(_, _) -> connected (_, _) -> connected
end end
), ),
meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}),
meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected),
meck:expect(
?CONNECTOR_IMPL,
on_get_channels,
fun(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId)
end
),
[?CONNECTOR_IMPL, emqx_connector_ee_schema]. [?CONNECTOR_IMPL, emqx_connector_ee_schema].
clear_resources(t_fail_delete_with_action) -> clear_resources(_) ->
lists:foreach( lists:foreach(
fun(#{type := Type, name := Name}) -> fun(#{type := Type, name := Name}) ->
ok = emqx_bridge_v2:remove(Type, Name) ok = emqx_bridge_v2:remove(Type, Name)
end, end,
emqx_bridge_v2:list() emqx_bridge_v2:list()
), ),
clear_resources(common);
clear_resources(_) ->
lists:foreach( lists:foreach(
fun(#{type := Type, name := Name}) -> fun(#{type := Type, name := Name}) ->
ok = emqx_connector:remove(Type, Name) ok = emqx_connector:remove(Type, Name)
@ -738,6 +738,62 @@ t_create_with_bad_name(Config) ->
?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg), ?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg),
ok. ok.
t_actions_field(Config) ->
Name = ?CONNECTOR_NAME,
?assertMatch(
{ok, 201, #{
<<"type">> := ?CONNECTOR_TYPE,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"actions">> := []
}},
request_json(
post,
uri(["connectors"]),
?KAFKA_CONNECTOR(Name),
Config
)
),
ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name),
BridgeName = ?BRIDGE_NAME,
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE,
<<"name">> := BridgeName,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"connector">> := Name,
<<"kafka">> := #{},
<<"local_topic">> := _,
<<"resource_opts">> := _
}},
request_json(
post,
uri(["actions"]),
?KAFKA_BRIDGE(?BRIDGE_NAME),
Config
)
),
?assertMatch(
{ok, 200, #{
<<"type">> := ?CONNECTOR_TYPE,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"actions">> := [BridgeName]
}},
request_json(
get,
uri(["connectors", ConnectorID]),
Config
)
),
ok.
t_fail_delete_with_action(Config) -> t_fail_delete_with_action(Config) ->
Name = ?CONNECTOR_NAME, Name = ?CONNECTOR_NAME,
?assertMatch( ?assertMatch(

View File

@ -35,6 +35,8 @@
values/1 values/1
]). ]).
-define(CONNECTOR_TYPE, pgsql).
roots() -> roots() ->
[]. [].
@ -64,12 +66,18 @@ fields("get_bridge_v2") ->
fields(pgsql_action); fields(pgsql_action);
fields("post_bridge_v2") -> fields("post_bridge_v2") ->
fields(pgsql_action); fields(pgsql_action);
fields("put_connector") -> fields(Field) when
fields("config_connector"); Field == "get_connector";
fields("get_connector") -> Field == "put_connector";
fields("config_connector"); Field == "post_connector"
fields("post_connector") -> ->
fields("config_connector"). fields({Field, ?CONNECTOR_TYPE});
fields({Field, Type}) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
emqx_connector_schema:api_fields(Field, Type, fields("connection_fields")).
server() -> server() ->
Meta = #{desc => ?DESC("server")}, Meta = #{desc => ?DESC("server")},
@ -94,7 +102,7 @@ connector_examples(Method) ->
#{ #{
<<"pgsql">> => #{ <<"pgsql">> => #{
summary => <<"PostgreSQL Connector">>, summary => <<"PostgreSQL Connector">>,
value => values({Method, pgsql}) value => values({Method, <<"pgsql">>})
} }
} }
]. ].
@ -109,20 +117,21 @@ values({get, PostgreSQLType}) ->
node => <<"emqx@localhost">>, node => <<"emqx@localhost">>,
status => <<"connected">> status => <<"connected">>
} }
] ],
actions => [<<"my_action">>]
}, },
values({post, PostgreSQLType}) values({post, PostgreSQLType})
); );
values({post, PostgreSQLType}) -> values({post, PostgreSQLType}) ->
values({put, PostgreSQLType});
values({put, PostgreSQLType}) ->
maps:merge( maps:merge(
#{ #{
name => <<"my_action">>, name => <<"my_", PostgreSQLType/binary, "_connector">>,
type => PostgreSQLType type => PostgreSQLType
}, },
values(common) values(common)
); );
values({put, _PostgreSQLType}) ->
values(common);
values(common) -> values(common) ->
#{ #{
<<"database">> => <<"emqx_data">>, <<"database">> => <<"emqx_data">>,

View File

@ -10,9 +10,54 @@ connector_field.desc:
connector_field.label: connector_field.label:
"""Connector""" """Connector"""
desc_name.desc:
"""The name of the connector."""
desc_name.label:
"""Connector Name"""
desc_type.desc:
"""The type of the connector."""
desc_type.label:
"""Connector Type"""
config_enable.desc: config_enable.desc:
"""Enable (true) or disable (false) this connector.""" """Enable (true) or disable (false) this connector."""
config_enable.label: config_enable.label:
"""Enable or Disable""" """Enable or Disable"""
desc_node_name.desc:
"""The node name."""
desc_node_name.label:
"""Node Name"""
desc_node_status.desc:
"""Node status."""
desc_node_status.label:
"""Node Status"""
desc_status.desc:
"""The status of the connector<br/>
- <code>connecting</code>: the initial state before any health probes were made.<br/>
- <code>connected</code>: when the connector passes the health probes.<br/>
- <code>disconnected</code>: when the connector can not pass health probes.<br/>
- <code>inconsistent</code>: When not all the nodes are at the same status."""
desc_status.label:
"""Connector Status"""
desc_status_reason.desc:
"""This is the reason given in case a connector is failing to connect."""
desc_status_reason.label:
"""Failure reason"""
connector_actions.desc:
"""List of actions added to this connector."""
connector_actions.label:
"""Actions"""
} }