diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 7004b70a1..dca55ba23 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -61,7 +61,11 @@ fields("config") -> {connector, field(connector)} ]; fields("post") -> - [type_field(), name_field() | fields("config")]; + [ + {type, mk(enum([influxdb]), #{required => true, desc => ?DESC("desc_type")})}, + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} + | fields("config") + ]; fields("put") -> fields("config"); fields("get") -> @@ -89,11 +93,3 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for HStream using `", string:to_upper(Method), "` method."]; desc(_) -> undefined. - -%% ------------------------------------------------------------------------------------------------- -%% internal -type_field() -> - {type, mk(enum([influxdb]), #{required => true, desc => ?DESC("desc_type")})}. - -name_field() -> - {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl index 55ccc2c2c..9e8e7bac1 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl @@ -26,17 +26,16 @@ fields(connectors) -> hoconsc:map(name, ref(emqx_ee_connector_hstream, config)), #{desc => <<"EMQX Enterprise Config">>} )} - ]; -% ] ++ fields(influxdb); + ] ++ fields(influxdb); fields(influxdb) -> [ { - influxdb, - mk(hoconsc:map(name, ref(emqx_ee_connector_influxdb, influxdb_udp)), #{ + Protocol, + mk(hoconsc:map(name, ref(emqx_ee_connector_influxdb, Protocol)), #{ desc => <<"EMQX Enterprise Config">> }) } - % || Protocol <- [influxdb_udp, influxdb_api_v1, influxdb_api_v2] + || Protocol <- [influxdb_udp, influxdb_api_v1, influxdb_api_v2] ]. connector_examples(Method) -> @@ -52,4 +51,7 @@ connector_examples(Method) -> lists:foldl(Fun, #{}, schema_modules()). schema_modules() -> - [emqx_ee_connector_hstream, emqx_ee_connector_influxdb]. + [ + emqx_ee_connector_hstream, + emqx_ee_connector_influxdb + ]. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 1efc0b263..37915d2e8 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -11,8 +11,6 @@ -behaviour(emqx_resource). --define(PUT_FIELDS_FILTER, fun({Name, _}) -> not lists:member(Name, [type, name]) end). - %% callbacks of behaviour emqx_resource -export([ on_start/2, @@ -50,23 +48,29 @@ on_get_status(_InstId, #{client := Client}) -> %% schema fields("put_udp") -> - lists:filter(?PUT_FIELDS_FILTER, fields(influxdb_udp)); + fields(influxdb_udp); fields("put_api_v1") -> - lists:filter(?PUT_FIELDS_FILTER, fields(influxdb_api_v1)); + fields(influxdb_api_v1); fields("put_api_v2") -> - lists:filter(?PUT_FIELDS_FILTER, fields(influxdb_api_v2)); + fields(influxdb_api_v2); fields("get_udp") -> - fields(influxdb_udp); + Key = influxdb_udp, + fields(Key) ++ type_name_field(Key); fields("get_api_v1") -> - fields(influxdb_api_v1); + Key = influxdb_api_v1, + fields(Key) ++ type_name_field(Key); fields("get_api_v2") -> - fields(influxdb_api_v2); + Key = influxdb_api_v2, + fields(Key) ++ type_name_field(Key); fields("post_udp") -> - fields(influxdb_udp); + Key = influxdb_udp, + fields(Key) ++ type_name_field(Key); fields("post_api_v1") -> - fields(influxdb_api_v1); + Key = influxdb_api_v1, + fields(Key) ++ type_name_field(Key); fields("post_api_v2") -> - fields(influxdb_api_v2); + Key = influxdb_api_v2, + fields(Key) ++ type_name_field(Key); fields(basic) -> [ {host, @@ -76,28 +80,29 @@ fields(basic) -> mk(enum([ns, us, ms, s, m, h]), #{ required => false, default => ms, desc => ?DESC("precision") })}, - {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}, - {name, mk(binary(), #{required => true, desc => ?DESC("name")})} + {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})} ]; fields(influxdb_udp) -> - [ - {type, mk(influxdb_udp, #{required => true, desc => ?DESC("type")})} - ] ++ fields(basic); + fields(basic); fields(influxdb_api_v1) -> [ - {type, mk(influxdb_api_v1, #{required => true, desc => ?DESC("type")})}, {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, {username, mk(binary(), #{required => true, desc => ?DESC("username")})}, {password, mk(binary(), #{required => true, desc => ?DESC("password")})} ] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic); fields(influxdb_api_v2) -> [ - {type, mk(influxdb_api_v2, #{required => true, desc => ?DESC("type")})}, {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, {org, mk(binary(), #{required => true, desc => ?DESC("org")})}, {token, mk(binary(), #{required => true, desc => ?DESC("token")})} ] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic). +type_name_field(Type) -> + [ + {type, mk(Type, #{required => true, desc => ?DESC("type")})}, + {name, mk(binary(), #{required => true, desc => ?DESC("name")})} + ]. + connector_examples(Method) -> [ #{ @@ -123,9 +128,8 @@ connector_examples(Method) -> values(Protocol, get) -> values(Protocol, post); values(Protocol, post) -> - Type = list_to_atom(io_lib:format("influxdb_~p", [Protocol])), - ConnectorName = list_to_binary(io_lib:format("~p_connector", [Protocol])), - maps:merge(values(Protocol, put), #{type => Type, name => ConnectorName}); + Type = list_to_atom("influxdb_" ++ atom_to_list(Protocol)), + maps:merge(values(Protocol, put), #{type => Type, name => <<"connector">>}); values(udp, put) -> #{ host => <<"127.0.0.1">>, @@ -219,7 +223,7 @@ do_start_client(InstId, ClientConfig, Config = #{egress := #{payload := PayloadB end. client_config( - _InstId, + InstId, Config = #{ host := Host, port := Port, @@ -230,7 +234,7 @@ client_config( {host, Host}, {port, Port}, {pool_size, PoolSize}, - {pool, atom_pname_todo}, + {pool, binary_to_atom(InstId, utf8)}, {precision, maps:get(precision, Config, ms)} ] ++ protocol_config(Config).