From 4c7ca2217ce53d5bb94e71b0b4b72e139d27687a Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 29 Jul 2022 18:36:48 +0800 Subject: [PATCH] fix: add influxdb udp api_v1 api_v2 connector --- .../src/emqx_ee_bridge_influxdb.erl | 6 +- lib-ee/emqx_ee_connector/rebar.config | 3 +- .../src/emqx_ee_connector.erl | 13 +- .../src/emqx_ee_connector_influxdb.erl | 124 ++++++++++++------ mix.exs | 6 + 5 files changed, 102 insertions(+), 50 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index d285c2621..7004b70a1 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -70,9 +70,9 @@ fields("get") -> field(connector) -> ConnectorConfigRef = [ - ref(emqx_ee_connector_influxdb, udp), - ref(emqx_ee_connector_influxdb, api_v1), - ref(emqx_ee_connector_influxdb, api_v2) + ref(emqx_ee_connector_influxdb, influxdb_udp), + ref(emqx_ee_connector_influxdb, influxdb_api_v1), + ref(emqx_ee_connector_influxdb, influxdb_api_v2) ], mk( hoconsc:union([binary() | ConnectorConfigRef]), diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index 38194cbf5..0a3f6866b 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -1,6 +1,7 @@ {erl_opts, [debug_info]}. {deps, [ - {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}} + {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, + {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.2"}}} ]}. {shell, [ diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl index d0884f945..55ccc2c2c 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.erl @@ -26,14 +26,17 @@ fields(connectors) -> hoconsc:map(name, ref(emqx_ee_connector_hstream, config)), #{desc => <<"EMQX Enterprise Config">>} )} - ] ++ fields(influxdb); + ]; +% ] ++ fields(influxdb); fields(influxdb) -> [ - {Protocol, - mk(hoconsc:map(name, ref(emqx_ee_connector_influxdb, Protocol)), #{ + { + influxdb, + mk(hoconsc:map(name, ref(emqx_ee_connector_influxdb, influxdb_udp)), #{ desc => <<"EMQX Enterprise Config">> - })} - || Protocol <- [udp, api_v1, api_v2] + }) + } + % || Protocol <- [influxdb_udp, influxdb_api_v1, influxdb_api_v2] ]. connector_examples(Method) -> diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 0ed7964b7..1efc0b263 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -32,37 +32,41 @@ on_start(InstId, Config) -> start_client(InstId, Config). -on_stop(_InstId, _State) -> - ok. +on_stop(_InstId, #{client := Client}) -> + influxdb:stop_client(Client). on_query(_InstId, {send_message, _Data}, _AfterQuery, _State) -> ok. -on_get_status(_InstId, _State) -> - % connected; - disconnected. +on_get_status(_InstId, #{client := Client}) -> + case influxdb:is_alive(Client) of + true -> + connected; + false -> + disconnected + end. %% ------------------------------------------------------------------------------------------------- %% schema fields("put_udp") -> - lists:filter(?PUT_FIELDS_FILTER, fields(udp)); + lists:filter(?PUT_FIELDS_FILTER, fields(influxdb_udp)); fields("put_api_v1") -> - lists:filter(?PUT_FIELDS_FILTER, fields(api_v1)); + lists:filter(?PUT_FIELDS_FILTER, fields(influxdb_api_v1)); fields("put_api_v2") -> - lists:filter(?PUT_FIELDS_FILTER, fields(api_v2)); + lists:filter(?PUT_FIELDS_FILTER, fields(influxdb_api_v2)); fields("get_udp") -> - fields(udp); + fields(influxdb_udp); fields("get_api_v1") -> - fields(api_v1); + fields(influxdb_api_v1); fields("get_api_v2") -> - fields(api_v2); + fields(influxdb_api_v2); fields("post_udp") -> - fields(udp); + fields(influxdb_udp); fields("post_api_v1") -> - fields(api_v1); + fields(influxdb_api_v1); fields("post_api_v2") -> - fields(api_v2); + fields(influxdb_api_v2); fields(basic) -> [ {host, @@ -73,23 +77,22 @@ fields(basic) -> required => false, default => ms, desc => ?DESC("precision") })}, {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}, - {type, mk(enum([influxdb]), #{required => true, desc => ?DESC("type")})}, {name, mk(binary(), #{required => true, desc => ?DESC("name")})} ]; -fields(udp) -> +fields(influxdb_udp) -> [ - {protocol, mk(enum([udp]), #{required => true, desc => ?DESC("protocol_udp")})} + {type, mk(influxdb_udp, #{required => true, desc => ?DESC("type")})} ] ++ fields(basic); -fields(api_v1) -> +fields(influxdb_api_v1) -> [ - {protocol, mk(enum([api_v1]), #{required => true, desc => ?DESC("protocol_api_v1")})}, + {type, mk(influxdb_api_v1, #{required => true, desc => ?DESC("type")})}, {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, {username, mk(binary(), #{required => true, desc => ?DESC("username")})}, {password, mk(binary(), #{required => true, desc => ?DESC("password")})} ] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic); -fields(api_v2) -> +fields(influxdb_api_v2) -> [ - {protocol, mk(enum([api_v2]), #{required => true, desc => ?DESC("protocol_api_v2")})}, + {type, mk(influxdb_api_v2, #{required => true, desc => ?DESC("type")})}, {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, {org, mk(binary(), #{required => true, desc => ?DESC("org")})}, {token, mk(binary(), #{required => true, desc => ?DESC("token")})} @@ -120,11 +123,11 @@ connector_examples(Method) -> values(Protocol, get) -> values(Protocol, post); values(Protocol, post) -> + Type = list_to_atom(io_lib:format("influxdb_~p", [Protocol])), ConnectorName = list_to_binary(io_lib:format("~p_connector", [Protocol])), - maps:merge(values(Protocol, put), #{type => influxdb, name => ConnectorName}); + maps:merge(values(Protocol, put), #{type => Type, name => ConnectorName}); values(udp, put) -> #{ - protocol => udp, host => <<"127.0.0.1">>, port => 8089, precision => ms, @@ -132,7 +135,6 @@ values(udp, put) -> }; values(api_v1, put) -> #{ - protocol => api_v1, host => <<"127.0.0.1">>, port => 8086, precision => ms, @@ -144,7 +146,6 @@ values(api_v1, put) -> }; values(api_v2, put) -> #{ - protocol => api_v2, host => <<"127.0.0.1">>, port => 8086, precision => ms, @@ -158,23 +159,64 @@ values(api_v2, put) -> %% internal functions start_client(InstId, Config) -> - io:format("InstId ~p~n", [InstId]), - client_config(InstId, Config). + ClientConfig = client_config(InstId, Config), + ?SLOG(info, #{ + msg => "starting influxdb connector", + connector => InstId, + config => Config, + client_config => ClientConfig + }), + try + do_start_client(InstId, ClientConfig, Config) + catch + E:R:S -> + ?SLOG(error, #{ + msg => "start influxdb connector error", + connector => InstId, + error => E, + reason => R, + stack => S + }), + {error, R} + end. -% ClientConfig = client_config(InstId, Config), -% case influxdb:start_client(ClientConfig) of -% {ok, Client} -> -% true = influxdb:is_alive(Client), -% maybe_pool_size(Client, Params); -% {error, {already_started, Client0}} -> -% _ = influxdb:stop_client(Client0), -% {ok, Client} = influxdb:start_client(Options), -% true = influxdb:is_alive(Client), -% maybe_pool_size(Client, Params); -% {error, Reason} -> -% logger:log(error, "Initiate influxdb failed ~0p", [Reason]), -% error({start_pool_failed, ResId}) -% end. +do_start_client(InstId, ClientConfig, Config = #{egress := #{payload := PayloadBin}}) -> + case influxdb:start_client(ClientConfig) of + {ok, Client} -> + case influxdb:is_alive(Client) of + true -> + Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin), + ?SLOG(info, #{ + msg => "starting influxdb connector success", + connector => InstId, + client => Client + }), + #{client => Client, payload => Payload}; + false -> + ?SLOG(error, #{ + msg => "starting influxdb connector failed", + connector => InstId, + client => Client, + reason => "client is not alive" + }), + {error, influxdb_client_not_alive} + end; + {error, {already_started, Client0}} -> + ?SLOG(info, #{ + msg => "starting influxdb connector,find already started client", + connector => InstId, + old_client => Client0 + }), + _ = influxdb:stop_client(Client0), + do_start_client(InstId, ClientConfig, Config); + {error, Reason} -> + ?SLOG(error, #{ + msg => "starting influxdb connector failed", + connector => InstId, + reason => Reason + }), + {error, Reason} + end. client_config( _InstId, diff --git a/mix.exs b/mix.exs index 6648aa35e..039172eb9 100644 --- a/mix.exs +++ b/mix.exs @@ -89,9 +89,15 @@ defmodule EMQXUmbrella.MixProject do github: "ninenines/ranch", ref: "a692f44567034dacf5efcaa24a24183788594eb7", override: true}, # in conflict by grpc and eetcd {:gpb, "4.11.2", override: true, runtime: false}, +<<<<<<< HEAD {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"} ] ++ umbrella_apps() ++ enterprise_apps(profile_info) ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep() +======= + {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, + {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.2"} + ] ++ umbrella_apps() ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep() +>>>>>>> fix: add influxdb udp api_v1 api_v2 connector end defp umbrella_apps() do