diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 0b885db8c..46039390d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -36,24 +36,31 @@ post_request() -> http_schema("post"). http_schema(Method) -> - Schemas = lists:flatmap( - fun(Type) -> - [ - ref(schema_mod(Type), Method ++ "_ingress"), - ref(schema_mod(Type), Method ++ "_egress") - ] - end, - ?CONN_TYPES - ), - ExtSchemas = [ref(Module, Method) || Module <- schema_modules()], - hoconsc:union(Schemas ++ ExtSchemas). + Broker = + lists:flatmap( + fun(Type) -> + [ + ref(schema_mod(Type), Method ++ "_ingress"), + ref(schema_mod(Type), Method ++ "_egress") + ] + end, + ?CONN_TYPES + ) ++ [ref(Module, Method) || Module <- [emqx_bridge_webhook_schema]], + EE = ee_schemas(Method), + hoconsc:union(Broker ++ EE). -if(?EMQX_RELEASE_EDITION == ee). -schema_modules() -> - [emqx_bridge_webhook_schema] ++ emqx_ee_bridge:schema_modules(). +ee_schemas(Method) -> + emqx_ee_bridge:api_schemas(Method). + +ee_fields_bridges() -> + emqx_ee_bridge:fields(bridges). -else. -schema_modules() -> - [emqx_bridge_webhook_schema]. +ee_schemas(_) -> + []. + +ee_fields_bridges() -> + []. -endif. common_bridge_fields(ConnectorRef) -> @@ -158,14 +165,6 @@ fields("node_status") -> {"status", mk(status(), #{})} ]. --if(?EMQX_RELEASE_EDITION == ee). -ee_fields_bridges() -> - emqx_ee_bridge:fields(bridges). --else. -ee_fields_bridges() -> - []. --endif. - desc(bridges) -> ?DESC("desc_bridges"); desc("metrics") -> diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl index 65d51443b..f0c9479de 100644 --- a/apps/emqx_connector/src/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/emqx_connector_schema.erl @@ -45,7 +45,7 @@ post_request() -> http_schema(Method) -> Broker = [?R_REF(schema_mod(Type), Method) || Type <- ?CONN_TYPES], - EE = [?R_REF(Module, Method) || Module <- schema_modules()], + EE = ee_schemas(Method), Schemas = Broker ++ EE, ?UNION(Schemas). @@ -70,8 +70,8 @@ fields("connectors") -> Broker ++ EE. -if(?EMQX_RELEASE_EDITION == ee). -schema_modules() -> - emqx_ee_connector:schema_modules(). +ee_schemas(Method) -> + emqx_ee_connector:api_schemas(Method). ee_fields_connectors() -> emqx_ee_connector:fields(connectors). @@ -79,7 +79,7 @@ ee_fields_connectors() -> ee_fields_connectors() -> []. -schema_modules() -> +ee_schemas(_) -> []. -endif. diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf index ad07ad377..2e4397a04 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf @@ -13,10 +13,10 @@ will be forwarded. 注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发到 HStreamDB。 """ } - label { - en: "Local Topic" - zh: "本地 Topic" - } + label { + en: "Local Topic" + zh: "本地 Topic" + } } payload { desc { diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf new file mode 100644 index 000000000..3930825e5 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf @@ -0,0 +1,124 @@ +emqx_ee_bridge_influxdb { + local_topic { + desc { + en: """ +The MQTT topic filter to be forwarded to the InfluxDB. All MQTT 'PUBLISH' messages with the topic +matching the local_topic will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded. +""" + zh: """ +发送到 'local_topic' 的消息都会转发到 InfluxDB。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发到 InfluxDB。 +""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + measurement { + desc { + en: """The measurement name to be forwarded to the InfluxDB. Placeholders supported.""" + zh: """要转发到 InfluxDB 的 Measurement 名称,支持占位符""" + } + label { + en: "Measurement" + zh: "Measurement" + } + } + timestamp { + desc { + en: """The timestamp to be forwarded to the InfluxDB. Placeholders supported. Default is message timestamp""" + zh: """要转发到 InfluxDB 的时间戳,支持占位符。默认使用消息的时间戳""" + } + label { + en: "Timestamp" + zh: "Timestamp" + } + } + tags { + desc { + en: """The tags to be forwarded to the InfluxDB. Placeholders supported.""" + zh: """要转发到 InfluxDB 的 Tags 数据内容,支持占位符""" + } + label { + en: "Tags" + zh: "Tags" + } + } + fields { + desc { + en: """The fields to be forwarded to the InfluxDB. Placeholders supported.""" + zh: """要转发到 InfluxDB 的 fields 数据内容,支持占位符""" + } + label { + en: "Fields" + zh: "Fields" + } + } + config_enable { + desc { + en: """Enable or disable this bridge""" + zh: """启用/禁用桥接""" + } + label { + en: "Enable Or Disable Bridge" + zh: "启用/禁用桥接" + } + } + config_direction { + desc { + en: """The direction of this bridge, MUST be 'egress'""" + zh: """桥接的方向, 必须是 egress""" + } + label { + en: "Bridge Direction" + zh: "桥接方向" + } + } + + desc_config { + desc { + en: """Configuration for an InfluxDB bridge.""" + zh: """InfluxDB 桥接配置""" + } + label: { + en: "InfluxDB Bridge Configuration" + zh: "InfluxDB 桥接配置" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """Bridge 类型""" + } + label { + en: "Bridge Type" + zh: "桥接类型" + } + } + + desc_name { + desc { + en: """Bridge name, used as a human-readable description of the bridge.""" + zh: """桥接名字,可读描述""" + } + label { + en: "Bridge Name" + zh: "桥接名字" + } + } + desc_connector { + desc { + en: """Generic configuration for the connector.""" + zh: """连接器的通用配置。""" + } + label: { + en: "Connector Generic Configuration" + zh: "连接器通用配置。" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 0ed8ee1fe..881609bfc 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -6,25 +6,43 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - schema_modules/0, + api_schemas/1, conn_bridge_examples/1, resource_type/1, fields/1 ]). +api_schemas(Method) -> + [ + ref(emqx_ee_bridge_hstream, Method), + ref(emqx_ee_bridge_influxdb, Method ++ "_udp"), + ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"), + ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2") + ]. + schema_modules() -> - [emqx_ee_bridge_hstream]. + [ + emqx_ee_bridge_hstream, + emqx_ee_bridge_influxdb + ]. conn_bridge_examples(Method) -> + MergeFun = + fun(Example, Examples) -> + maps:merge(Examples, Example) + end, Fun = fun(Module, Examples) -> - Example = erlang:apply(Module, conn_bridge_example, [Method]), - maps:merge(Examples, Example) + ConnectorExamples = erlang:apply(Module, conn_bridge_examples, [Method]), + lists:foldl(MergeFun, Examples, ConnectorExamples) end, lists:foldl(Fun, #{}, schema_modules()). +resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); resource_type(hstreamdb) -> emqx_ee_connector_hstream; -resource_type(<<"hstreamdb">>) -> emqx_ee_connector_hstream. +resource_type(influxdb_udp) -> emqx_ee_connector_influxdb; +resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; +resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb. fields(bridges) -> [ @@ -33,4 +51,13 @@ fields(bridges) -> hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")), #{desc => <<"EMQX Enterprise Config">>} )} + ] ++ fields(influxdb); +fields(influxdb) -> + [ + {Protocol, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_influxdb, Protocol)), + #{desc => <<"EMQX Enterprise Config">>} + )} + || Protocol <- [influxdb_udp, influxdb_api_v1, influxdb_api_v2] ]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl index 73f7a20eb..200e695da 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl @@ -10,7 +10,7 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - conn_bridge_example/1 + conn_bridge_examples/1 ]). -export([ @@ -23,13 +23,15 @@ %% ------------------------------------------------------------------------------------------------- %% api -conn_bridge_example(Method) -> - #{ - <<"hstreamdb">> => #{ - summary => <<"HStreamDB Bridge">>, - value => values(Method) +conn_bridge_examples(Method) -> + [ + #{ + <<"hstreamdb">> => #{ + summary => <<"HStreamDB Bridge">>, + value => values(Method) + } } - }. + ]. values(get) -> maps:merge(values(post), ?METRICS_EXAMPLE); diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl new file mode 100644 index 000000000..a19459208 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -0,0 +1,144 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_influxdb). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_ee_bridge.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"influxdb_udp">> => #{ + summary => <<"InfluxDB UDP Bridge">>, + value => values("influxdb_udp", Method) + } + }, + #{ + <<"influxdb_api_v1">> => #{ + summary => <<"InfluxDB HTTP API V1 Bridge">>, + value => values("influxdb_api_v1", Method) + } + }, + #{ + <<"influxdb_api_v2">> => #{ + summary => <<"InfluxDB HTTP API V2 Bridge">>, + value => values("influxdb_api_v2", Method) + } + } + ]. + +values(Protocol, get) -> + maps:merge(values(Protocol, post), ?METRICS_EXAMPLE); +values(Protocol, post) -> + #{ + type => list_to_atom(Protocol), + name => <<"demo">>, + connector => list_to_binary(Protocol ++ ":connector"), + enable => true, + direction => egress, + local_topic => <<"local/topic/#">>, + measurement => <<"${topic}">>, + tags => #{<<"clientid">> => <<"${clientid}">>}, + fields => #{ + <<"payload">> => <<"${payload}">>, + <<"int_value">> => [int, <<"${payload.int_key}">>], + <<"uint_value">> => [uint, <<"${payload.uint_key}">>] + } + }; +values(Protocol, put) -> + values(Protocol, post). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge". + +roots() -> []. + +fields(basic) -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, + {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, + {measurement, mk(binary(), #{desc => ?DESC("measurement"), required => true})}, + {timestamp, + mk(binary(), #{ + desc => ?DESC("timestamp"), default => <<"${timestamp}">>, required => false + })}, + {tags, mk(map(), #{desc => ?DESC("tags"), required => false})}, + {fields, mk(map(), #{desc => ?DESC("fields"), required => true})} + ]; +fields("post_udp") -> + method_fileds(post, influxdb_udp); +fields("post_api_v1") -> + method_fileds(post, influxdb_api_v1); +fields("post_api_v2") -> + method_fileds(post, influxdb_api_v2); +fields("put_udp") -> + method_fileds(put, influxdb_udp); +fields("put_api_v1") -> + method_fileds(put, influxdb_api_v1); +fields("put_api_v2") -> + method_fileds(put, influxdb_api_v2); +fields("get_udp") -> + method_fileds(get, influxdb_udp); +fields("get_api_v1") -> + method_fileds(get, influxdb_api_v1); +fields("get_api_v2") -> + method_fileds(get, influxdb_api_v2); +fields(Name) when + Name == influxdb_udp orelse Name == influxdb_api_v1 orelse Name == influxdb_api_v2 +-> + fields(basic) ++ connector_field(Name). + +method_fileds(post, ConnectorType) -> + fields(basic) ++ connector_field(ConnectorType) ++ type_name_field(ConnectorType); +method_fileds(get, ConnectorType) -> + fields(basic) ++ + emqx_bridge_schema:metrics_status_fields() ++ + connector_field(ConnectorType) ++ type_name_field(ConnectorType); +method_fileds(put, ConnectorType) -> + fields(basic) ++ connector_field(ConnectorType). + +connector_field(Type) -> + [ + {connector, + mk( + hoconsc:union([binary(), ref(emqx_ee_connector_influxdb, Type)]), + #{ + required => true, + example => list_to_binary(atom_to_list(Type) ++ ":connector"), + desc => ?DESC(<<"desc_connector">>) + } + )} + ]. + +type_name_field(Type) -> + [ + {type, mk(Type, #{required => true, desc => ?DESC("desc_type")})}, + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} + ]. + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for HStream using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf new file mode 100644 index 000000000..a909c9a72 --- /dev/null +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf @@ -0,0 +1,165 @@ + +emqx_ee_connector_influxdb { + type { + desc { + en: """The Connector Type.""" + zh: """连接器类型。""" + } + label: { + en: """Connector Type""" + zh: """连接器类型""" + } + } + + name { + desc { + en: """Connector name, used as a human-readable description of the connector.""" + zh: """连接器名称,人类可读的连接器描述。""" + } + label: { + en: """Connector Name""" + zh: """连接器名称""" + } + } + host { + desc { + en: """InfluxDB host.""" + zh: """InfluxDB 主机地址。""" + } + label: { + en: """Host""" + zh: """主机""" + } + } + port { + desc { + en: """InfluxDB port.""" + zh: """InfluxDB 端口。""" + } + label: { + en: """Port""" + zh: """端口""" + } + } + protocol { + desc { + en: """InfluxDB protocol. UDP or HTTP API or HTTP API V2""" + zh: """InfluxDB 协议。UDP 或 HTTP API 或 HTTP API V2""" + } + label: { + en: """Protocol""" + zh: """协议""" + } + } + protocol_udp { + desc { + en: """InfluxDB protocol.""" + zh: """InfluxDB UDP 协议""" + } + label: { + en: """UDP Protocol""" + zh: """UDP 协议""" + } + } + protocol_api_v1 { + desc { + en: """InfluxDB protocol. Support InfluxDB v1.8 and before.""" + zh: """InfluxDB HTTP API 协议。支持 Influxdb v1.8 以及之前的版本""" + } + label: { + en: """HTTP API Protocol""" + zh: """HTTP API 协议""" + } + } + protocol_api_v2 { + desc { + en: """InfluxDB protocol. Support InfluxDB v2.0 and after.""" + zh: """InfluxDB HTTP API V2 协议。支持 Influxdb v2.0 以及之后的版本""" + } + label: { + en: """HTTP API V2 Protocol""" + zh: """HTTP API V2 协议""" + } + } + database { + desc { + en: """InfluxDB database.""" + zh: """InfluxDB 数据库。""" + } + label: { + en: "Database" + zh: "数据库" + } + } + username { + desc { + en: "InfluxDB username." + zh: "InfluxDB 用户名。" + } + label: { + en: "Username" + zh: "用户名" + } + } + password { + desc { + en: "InfluxDB password." + zh: "InfluxDB 密码。" + } + label: { + en: "Password" + zh: "密码" + } + } + bucket { + desc { + en: "InfluxDB bucket name." + zh: "InfluxDB bucket 名称" + } + label: { + en: "Bucket" + zh: "Bucket" + } + } + org { + desc { + en: """InfluxDB organization name.""" + zh: """InfluxDB 组织名称。""" + } + label: { + en: """Organization""" + zh: """组织""" + } + } + token { + desc { + en: """InfluxDB token.""" + zh: """InfluxDB token。""" + } + label: { + en: """Token""" + zh: """Token""" + } + } + precision { + desc { + en: """InfluxDB time precision.""" + zh: """InfluxDB 时间精度。""" + } + label: { + en: """Time Precision""" + zh: """时间精度""" + } + } + pool_size { + desc { + en: """InfluxDB Pool Size""" + zh: """InfluxDB 连接池大小""" + } + label { + en: """InfluxDB Pool Size""" + zh: """InfluxDB 连接池大小""" + } + } + +} diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index 38194cbf5..485b0120d 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -1,6 +1,7 @@ {erl_opts, [debug_info]}. {deps, [ - {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}} + {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, + {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.3"}}} ]}. {shell, [ diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 6d9fea3c9..675a934aa 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -4,7 +4,8 @@ {applications, [ kernel, stdlib, - hstreamdb_erl + hstreamdb_erl, + influxdb ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl index 7d175f0eb..9e8e7bac1 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl @@ -6,13 +6,18 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - schema_modules/0, fields/1, - connector_examples/1 + connector_examples/1, + api_schemas/1 ]). -schema_modules() -> - [emqx_ee_connector_hstream]. +api_schemas(Method) -> + [ + ref(emqx_ee_connector_hstream, Method), + ref(emqx_ee_connector_influxdb, Method ++ "_udp"), + ref(emqx_ee_connector_influxdb, Method ++ "_api_v1"), + ref(emqx_ee_connector_influxdb, Method ++ "_api_v2") + ]. fields(connectors) -> [ @@ -21,12 +26,32 @@ fields(connectors) -> hoconsc:map(name, ref(emqx_ee_connector_hstream, config)), #{desc => <<"EMQX Enterprise Config">>} )} + ] ++ fields(influxdb); +fields(influxdb) -> + [ + { + Protocol, + mk(hoconsc:map(name, ref(emqx_ee_connector_influxdb, Protocol)), #{ + desc => <<"EMQX Enterprise Config">> + }) + } + || Protocol <- [influxdb_udp, influxdb_api_v1, influxdb_api_v2] ]. connector_examples(Method) -> - Fun = - fun(Module, Examples) -> - Example = erlang:apply(Module, connector_example, [Method]), + MergeFun = + fun(Example, Examples) -> maps:merge(Examples, Example) end, + Fun = + fun(Module, Examples) -> + ConnectorExamples = erlang:apply(Module, connector_examples, [Method]), + lists:foldl(MergeFun, Examples, ConnectorExamples) + end, lists:foldl(Fun, #{}, schema_modules()). + +schema_modules() -> + [ + emqx_ee_connector_hstream, + emqx_ee_connector_influxdb + ]. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl index 00c7e6f61..7a8ce3fe9 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl @@ -26,7 +26,7 @@ -export([ roots/0, fields/1, - connector_example/1 + connector_examples/1 ]). %% ------------------------------------------------------------------------------------------------- @@ -96,13 +96,15 @@ fields("post") -> {name, mk(binary(), #{required => true, desc => ?DESC("name")})} ] ++ fields("put"). -connector_example(Method) -> - #{ - <<"hstreamdb">> => #{ - summary => <<"HStreamDB Connector">>, - value => values(Method) +connector_examples(Method) -> + [ + #{ + <<"hstreamdb">> => #{ + summary => <<"HStreamDB Connector">>, + value => values(Method) + } } - }. + ]. values(post) -> maps:merge(values(put), #{name => <<"connector">>}); diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl new file mode 100644 index 000000000..c6528ffd0 --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -0,0 +1,410 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_connector_influxdb). + +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-behaviour(emqx_resource). + +%% callbacks of behaviour emqx_resource +-export([ + on_start/2, + on_stop/2, + on_query/4, + on_get_status/2 +]). + +-export([ + fields/1, + connector_examples/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% resource callback + +on_start(InstId, Config) -> + start_client(InstId, Config). + +on_stop(_InstId, #{client := Client}) -> + influxdb:stop_client(Client). + +on_query(InstId, {send_message, Data}, AfterQuery, State) -> + do_query(InstId, {send_message, Data}, AfterQuery, State). + +on_get_status(_InstId, #{client := Client}) -> + case influxdb:is_alive(Client) of + true -> + connected; + false -> + disconnected + end. + +%% ------------------------------------------------------------------------------------------------- +%% schema + +fields("put_udp") -> + fields(influxdb_udp); +fields("put_api_v1") -> + fields(influxdb_api_v1); +fields("put_api_v2") -> + fields(influxdb_api_v2); +fields("get_udp") -> + Key = influxdb_udp, + fields(Key) ++ type_name_field(Key); +fields("get_api_v1") -> + Key = influxdb_api_v1, + fields(Key) ++ type_name_field(Key); +fields("get_api_v2") -> + Key = influxdb_api_v2, + fields(Key) ++ type_name_field(Key); +fields("post_udp") -> + Key = influxdb_udp, + fields(Key) ++ type_name_field(Key); +fields("post_api_v1") -> + Key = influxdb_api_v1, + fields(Key) ++ type_name_field(Key); +fields("post_api_v2") -> + Key = influxdb_api_v2, + fields(Key) ++ type_name_field(Key); +fields(basic) -> + [ + {host, + mk(binary(), #{required => true, default => <<"120.0.0.1">>, desc => ?DESC("host")})}, + {port, mk(pos_integer(), #{required => true, default => 8086, desc => ?DESC("port")})}, + {precision, + mk(enum([ns, us, ms, s, m, h]), #{ + required => false, default => ms, desc => ?DESC("precision") + })}, + {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})} + ]; +fields(influxdb_udp) -> + fields(basic); +fields(influxdb_api_v1) -> + [ + {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, + {username, mk(binary(), #{required => true, desc => ?DESC("username")})}, + {password, mk(binary(), #{required => true, desc => ?DESC("password")})} + ] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic); +fields(influxdb_api_v2) -> + [ + {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, + {org, mk(binary(), #{required => true, desc => ?DESC("org")})}, + {token, mk(binary(), #{required => true, desc => ?DESC("token")})} + ] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic). + +type_name_field(Type) -> + [ + {type, mk(Type, #{required => true, desc => ?DESC("type")})}, + {name, mk(binary(), #{required => true, desc => ?DESC("name")})} + ]. + +connector_examples(Method) -> + [ + #{ + <<"influxdb_udp">> => #{ + summary => <<"InfluxDB UDP Connector">>, + value => values(udp, Method) + } + }, + #{ + <<"influxdb_api_v1">> => #{ + summary => <<"InfluxDB HTTP API V1 Connector">>, + value => values(api_v1, Method) + } + }, + #{ + <<"influxdb_api_v2">> => #{ + summary => <<"InfluxDB HTTP API V2 Connector">>, + value => values(api_v2, Method) + } + } + ]. + +values(Protocol, get) -> + values(Protocol, post); +values(Protocol, post) -> + Type = list_to_atom("influxdb_" ++ atom_to_list(Protocol)), + maps:merge(values(Protocol, put), #{type => Type, name => <<"connector">>}); +values(udp, put) -> + #{ + host => <<"127.0.0.1">>, + port => 8089, + precision => ms, + pool_size => 8 + }; +values(api_v1, put) -> + #{ + host => <<"127.0.0.1">>, + port => 8086, + precision => ms, + pool_size => 8, + database => <<"my_db">>, + username => <<"my_user">>, + password => <<"my_password">>, + ssl => #{enable => false} + }; +values(api_v2, put) -> + #{ + host => <<"127.0.0.1">>, + port => 8086, + precision => ms, + pool_size => 8, + bucket => <<"my_bucket">>, + org => <<"my_org">>, + token => <<"my_token">>, + ssl => #{enable => false} + }. +%% ------------------------------------------------------------------------------------------------- +%% internal functions + +start_client(InstId, Config) -> + ClientConfig = client_config(InstId, Config), + ?SLOG(info, #{ + msg => "starting influxdb connector", + connector => InstId, + config => Config, + client_config => ClientConfig + }), + try + do_start_client(InstId, ClientConfig, Config) + catch + E:R:S -> + ?SLOG(error, #{ + msg => "start influxdb connector error", + connector => InstId, + error => E, + reason => R, + stack => S + }), + {error, R} + end. + +do_start_client( + InstId, + ClientConfig, + Config = #{ + egress := #{ + measurement := Measurement, + timestamp := Timestamp, + tags := Tags, + fields := Fields + } + } +) -> + case influxdb:start_client(ClientConfig) of + {ok, Client} -> + case influxdb:is_alive(Client) of + true -> + State = #{ + client => Client, + measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), + timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp), + tags => to_tags_config(Tags), + fields => to_fields_config(Fields) + }, + ?SLOG(info, #{ + msg => "starting influxdb connector success", + connector => InstId, + client => Client, + state => State + }), + {ok, State}; + false -> + ?SLOG(error, #{ + msg => "starting influxdb connector failed", + connector => InstId, + client => Client, + reason => "client is not alive" + }), + {error, influxdb_client_not_alive} + end; + {error, {already_started, Client0}} -> + ?SLOG(info, #{ + msg => "starting influxdb connector,find already started client", + connector => InstId, + old_client => Client0 + }), + _ = influxdb:stop_client(Client0), + do_start_client(InstId, ClientConfig, Config); + {error, Reason} -> + ?SLOG(error, #{ + msg => "starting influxdb connector failed", + connector => InstId, + reason => Reason + }), + {error, Reason} + end. + +client_config( + InstId, + Config = #{ + host := Host, + port := Port, + pool_size := PoolSize + } +) -> + [ + {host, binary_to_list(Host)}, + {port, Port}, + {pool_size, PoolSize}, + {pool, binary_to_atom(InstId, utf8)}, + {precision, atom_to_binary(maps:get(precision, Config, ms), utf8)} + ] ++ protocol_config(Config). + +%% api v2 config +protocol_config(#{ + username := Username, + password := Password, + database := DB, + ssl := SSL +}) -> + [ + {protocol, http}, + {version, v1}, + {username, binary_to_list(Username)}, + {password, binary_to_list(Password)}, + {database, binary_to_list(DB)} + ] ++ ssl_config(SSL); +%% api v1 config +protocol_config(#{ + bucket := Bucket, + org := Org, + token := Token, + ssl := SSL +}) -> + [ + {protocol, http}, + {version, v2}, + {bucket, binary_to_list(Bucket)}, + {org, binary_to_list(Org)}, + {token, Token} + ] ++ ssl_config(SSL); +%% udp config +protocol_config(_) -> + [ + {protocol, udp} + ]. + +ssl_config(#{enable := false}) -> + [ + {https_enabled, false} + ]; +ssl_config(SSL = #{enable := true}) -> + [ + {https_enabled, true}, + {transport, ssl} + ] ++ maps:to_list(maps:remove(enable, SSL)). + +%% ------------------------------------------------------------------------------------------------- +%% Query + +do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client}) -> + case data_to_point(Data, State) of + {ok, Point} -> + case influxdb:write(Client, [Point]) of + ok -> + ?SLOG(debug, #{ + msg => "influxdb write point success", + connector => InstId, + point => Point + }), + emqx_resource:query_success(AfterQuery); + {error, Reason} -> + ?SLOG(error, #{ + msg => "influxdb write point failed", + connector => InstId, + reason => Reason + }), + emqx_resource:query_failed(AfterQuery) + end; + {error, Reason} -> + ?SLOG(error, #{ + msg => "influxdb trans point failed", + connector => InstId, + reason => Reason + }), + {error, Reason} + end. + +%% ------------------------------------------------------------------------------------------------- +%% Tags & Fields Config Trans + +to_tags_config(Tags) -> + maps:fold(fun to_maps_config/3, #{}, Tags). + +to_fields_config(Fields) -> + maps:fold(fun to_maps_config/3, #{}, Fields). + +to_maps_config(K, [IntType, V], Res) when IntType == <<"int">> orelse IntType == <<"uint">> -> + NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)), + NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)), + Res#{NK => {binary_to_atom(IntType, utf8), NV}}; +to_maps_config(K, V, Res) -> + NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)), + NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)), + Res#{NK => NV}. + +%% ------------------------------------------------------------------------------------------------- +%% Tags & Fields Data Trans +data_to_point( + Data, + #{ + measurement := Measurement, + timestamp := Timestamp, + tags := Tags, + fields := Fields + } +) -> + TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, + case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of + [TimestampInt] when is_integer(TimestampInt) -> + {_, EncodeTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), + {_, EncodeFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), + Point = #{ + measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), + timestamp => TimestampInt, + tags => EncodeTags, + fields => EncodeFields + }, + {ok, Point}; + BadTimestamp -> + {error, {bad_timestamp, BadTimestamp}} + end. + +maps_config_to_data(K, {IntType, V}, {Data, Res}) when IntType == int orelse IntType == uint -> + TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, + NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, TransOptions), + NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, TransOptions), + case {NK, NV} of + {[undefined], _} -> + {Data, Res}; + {_, [undefined]} -> + {Data, Res}; + {_, [IntV]} when is_integer(IntV) -> + {Data, Res#{NK => {IntType, IntV}}} + end; +maps_config_to_data(K, V, {Data, Res}) -> + TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, + NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, TransOptions), + NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, TransOptions), + case {NK, NV} of + {[undefined], _} -> + {Data, Res}; + {_, [undefined]} -> + {Data, Res}; + _ -> + {Data, Res#{bin(NK) => NV}} + end. + +data_filter(undefined) -> undefined; +data_filter(Int) when is_integer(Int) -> Int; +data_filter(Number) when is_number(Number) -> Number; +data_filter(Bool) when is_boolean(Bool) -> Bool; +data_filter(Data) -> bin(Data). + +bin(Data) -> emqx_plugin_libs_rule:bin(Data). diff --git a/mix.exs b/mix.exs index 6648aa35e..77438d213 100644 --- a/mix.exs +++ b/mix.exs @@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do {:lc, github: "emqx/lc", tag: "0.3.1"}, {:redbug, "2.0.7"}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.3.0"}, + {:ehttpc, github: "emqx/ehttpc", tag: "0.3.0", override: true}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, @@ -56,7 +56,7 @@ defmodule EMQXUmbrella.MixProject do {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.6", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.5", override: true}, - {:ecpool, github: "emqx/ecpool", tag: "0.5.2"}, + {:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true}, {:replayq, "0.3.4", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, {:emqtt, github: "emqx/emqtt", tag: "1.6.0", override: true}, @@ -89,7 +89,8 @@ defmodule EMQXUmbrella.MixProject do github: "ninenines/ranch", ref: "a692f44567034dacf5efcaa24a24183788594eb7", override: true}, # in conflict by grpc and eetcd {:gpb, "4.11.2", override: true, runtime: false}, - {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"} + {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, + {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.3", override: true} ] ++ umbrella_apps() ++ enterprise_apps(profile_info) ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep() end