refactor: split influxdb bridges to actions and connectors
This commit is contained in:
parent
071d15d13e
commit
7c7a0b2172
|
@ -8,7 +8,7 @@
|
||||||
emqx_resource,
|
emqx_resource,
|
||||||
influxdb
|
influxdb
|
||||||
]},
|
]},
|
||||||
{env, []},
|
{env, [{emqx_action_info_modules, [emqx_bridge_influxdb_action_info]}]},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{links, []}
|
{links, []}
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -11,7 +11,6 @@
|
||||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
conn_bridge_examples/1,
|
|
||||||
write_syntax_type/0
|
write_syntax_type/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -22,17 +21,28 @@
|
||||||
desc/1
|
desc/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% Examples
|
||||||
|
-export([
|
||||||
|
bridge_v2_examples/1,
|
||||||
|
conn_bridge_examples/1,
|
||||||
|
connector_examples/1
|
||||||
|
]).
|
||||||
|
|
||||||
-type write_syntax() :: list().
|
-type write_syntax() :: list().
|
||||||
-reflect_type([write_syntax/0]).
|
-reflect_type([write_syntax/0]).
|
||||||
-typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}).
|
-typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}).
|
||||||
-export([to_influx_lines/1]).
|
-export([to_influx_lines/1]).
|
||||||
|
|
||||||
|
-define(CONNECTOR_TYPE, influxdb).
|
||||||
|
-define(ACTION_TYPE, influxdb).
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% api
|
%% api
|
||||||
|
|
||||||
write_syntax_type() ->
|
write_syntax_type() ->
|
||||||
typerefl:alias("string", write_syntax()).
|
typerefl:alias("string", write_syntax()).
|
||||||
|
|
||||||
|
%% Examples
|
||||||
conn_bridge_examples(Method) ->
|
conn_bridge_examples(Method) ->
|
||||||
[
|
[
|
||||||
#{
|
#{
|
||||||
|
@ -49,25 +59,80 @@ conn_bridge_examples(Method) ->
|
||||||
}
|
}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
bridge_v2_examples(Method) ->
|
||||||
|
WriteExample =
|
||||||
|
<<"${topic},clientid=${clientid} ", "payload=${payload},",
|
||||||
|
"${clientid}_int_value=${payload.int_key}i,", "bool=${payload.bool}">>,
|
||||||
|
ParamsExample = #{
|
||||||
|
parameters => #{
|
||||||
|
write_syntax => WriteExample, precision => ms
|
||||||
|
}
|
||||||
|
},
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"influxdb">> => #{
|
||||||
|
summary => <<"InfluxDB Action">>,
|
||||||
|
value => emqx_bridge_v2_schema:action_values(
|
||||||
|
Method, influxdb, influxdb, ParamsExample
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
].
|
||||||
|
|
||||||
|
connector_examples(Method) ->
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"influxdb_api_v1">> => #{
|
||||||
|
summary => <<"InfluxDB HTTP API V1 Connector">>,
|
||||||
|
value => emqx_connector_schema:connector_values(
|
||||||
|
Method, influxdb, connector_values(influxdb_api_v1)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
<<"influxdb_api_v2">> => #{
|
||||||
|
summary => <<"InfluxDB HTTP API V2 Connector">>,
|
||||||
|
value => emqx_connector_schema:connector_values(
|
||||||
|
Method, influxdb, connector_values(influxdb_api_v2)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
].
|
||||||
|
|
||||||
|
connector_values(Type) ->
|
||||||
|
maps:merge(basic_connector_values(), #{parameters => connector_values_v(Type)}).
|
||||||
|
|
||||||
|
connector_values_v(influxdb_api_v2) ->
|
||||||
|
#{
|
||||||
|
influxdb_type => influxdb_api_v2,
|
||||||
|
bucket => <<"example_bucket">>,
|
||||||
|
org => <<"examlpe_org">>,
|
||||||
|
token => <<"example_token">>
|
||||||
|
};
|
||||||
|
connector_values_v(influxdb_api_v1) ->
|
||||||
|
#{
|
||||||
|
influxdb_type => influxdb_api_v1,
|
||||||
|
database => <<"example_database">>,
|
||||||
|
username => <<"example_username">>,
|
||||||
|
password => <<"******">>
|
||||||
|
}.
|
||||||
|
|
||||||
|
basic_connector_values() ->
|
||||||
|
#{
|
||||||
|
enable => true,
|
||||||
|
server => <<"127.0.0.1:8086">>,
|
||||||
|
ssl => #{enable => false}
|
||||||
|
}.
|
||||||
|
|
||||||
values(Protocol, get) ->
|
values(Protocol, get) ->
|
||||||
values(Protocol, post);
|
values(Protocol, post);
|
||||||
values("influxdb_api_v2", post) ->
|
values("influxdb_api_v2", post) ->
|
||||||
SupportUint = <<"uint_value=${payload.uint_key}u,">>,
|
SupportUint = <<"uint_value=${payload.uint_key}u,">>,
|
||||||
TypeOpts = #{
|
TypeOpts = connector_values_v(influxdb_api_v2),
|
||||||
bucket => <<"example_bucket">>,
|
|
||||||
org => <<"examlpe_org">>,
|
|
||||||
token => <<"example_token">>,
|
|
||||||
server => <<"127.0.0.1:8086">>
|
|
||||||
},
|
|
||||||
values(common, "influxdb_api_v2", SupportUint, TypeOpts);
|
values(common, "influxdb_api_v2", SupportUint, TypeOpts);
|
||||||
values("influxdb_api_v1", post) ->
|
values("influxdb_api_v1", post) ->
|
||||||
SupportUint = <<>>,
|
SupportUint = <<>>,
|
||||||
TypeOpts = #{
|
TypeOpts = connector_values_v(influxdb_api_v1),
|
||||||
database => <<"example_database">>,
|
|
||||||
username => <<"example_username">>,
|
|
||||||
password => <<"******">>,
|
|
||||||
server => <<"127.0.0.1:8086">>
|
|
||||||
},
|
|
||||||
values(common, "influxdb_api_v1", SupportUint, TypeOpts);
|
values(common, "influxdb_api_v1", SupportUint, TypeOpts);
|
||||||
values(Protocol, put) ->
|
values(Protocol, put) ->
|
||||||
values(Protocol, post).
|
values(Protocol, post).
|
||||||
|
@ -98,6 +163,10 @@ namespace() -> "bridge_influxdb".
|
||||||
|
|
||||||
roots() -> [].
|
roots() -> [].
|
||||||
|
|
||||||
|
fields("config_connector") ->
|
||||||
|
emqx_connector_schema:common_fields() ++
|
||||||
|
connection_fields() ++
|
||||||
|
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
||||||
fields("post_api_v1") ->
|
fields("post_api_v1") ->
|
||||||
method_fields(post, influxdb_api_v1);
|
method_fields(post, influxdb_api_v1);
|
||||||
fields("post_api_v2") ->
|
fields("post_api_v2") ->
|
||||||
|
@ -110,12 +179,59 @@ fields("get_api_v1") ->
|
||||||
method_fields(get, influxdb_api_v1);
|
method_fields(get, influxdb_api_v1);
|
||||||
fields("get_api_v2") ->
|
fields("get_api_v2") ->
|
||||||
method_fields(get, influxdb_api_v2);
|
method_fields(get, influxdb_api_v2);
|
||||||
|
fields(action) ->
|
||||||
|
{influxdb,
|
||||||
|
mk(
|
||||||
|
hoconsc:map(name, ref(?MODULE, influxdb_action)),
|
||||||
|
#{desc => <<"InfluxDB Action Config">>, required => false}
|
||||||
|
)};
|
||||||
|
fields(influxdb_action) ->
|
||||||
|
emqx_bridge_v2_schema:make_producer_action_schema(
|
||||||
|
mk(ref(?MODULE, action_parameters), #{
|
||||||
|
required => true, desc => ?DESC(action_parameters)
|
||||||
|
})
|
||||||
|
);
|
||||||
|
fields(action_parameters) ->
|
||||||
|
[
|
||||||
|
{write_syntax, fun write_syntax/1},
|
||||||
|
emqx_bridge_influxdb_connector:precision_field()
|
||||||
|
];
|
||||||
|
fields(connector_resource_opts) ->
|
||||||
|
emqx_connector_schema:resource_opts_fields();
|
||||||
|
fields(Field) when
|
||||||
|
Field == "get_connector";
|
||||||
|
Field == "put_connector";
|
||||||
|
Field == "post_connector"
|
||||||
|
->
|
||||||
|
Fields =
|
||||||
|
connection_fields() ++
|
||||||
|
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
|
||||||
|
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
|
||||||
|
fields(Field) when
|
||||||
|
Field == "get_bridge_v2";
|
||||||
|
Field == "post_bridge_v2";
|
||||||
|
Field == "put_bridge_v2"
|
||||||
|
->
|
||||||
|
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(influxdb_action));
|
||||||
fields(Type) when
|
fields(Type) when
|
||||||
Type == influxdb_api_v1 orelse Type == influxdb_api_v2
|
Type == influxdb_api_v1 orelse Type == influxdb_api_v2
|
||||||
->
|
->
|
||||||
influxdb_bridge_common_fields() ++
|
influxdb_bridge_common_fields() ++
|
||||||
connector_fields(Type).
|
connector_fields(Type).
|
||||||
|
|
||||||
|
connection_fields() ->
|
||||||
|
[
|
||||||
|
emqx_bridge_influxdb_connector:server_field(),
|
||||||
|
{parameters,
|
||||||
|
mk(
|
||||||
|
hoconsc:union([
|
||||||
|
ref(emqx_bridge_influxdb_connector, "connector_" ++ T)
|
||||||
|
|| T <- ["influxdb_api_v1", "influxdb_api_v2"]
|
||||||
|
]),
|
||||||
|
#{required => true, desc => ?DESC("influxdb_parameters")}
|
||||||
|
)}
|
||||||
|
] ++ emqx_connector_schema_lib:ssl_fields().
|
||||||
|
|
||||||
method_fields(post, ConnectorType) ->
|
method_fields(post, ConnectorType) ->
|
||||||
influxdb_bridge_common_fields() ++
|
influxdb_bridge_common_fields() ++
|
||||||
connector_fields(ConnectorType) ++
|
connector_fields(ConnectorType) ++
|
||||||
|
@ -154,6 +270,10 @@ desc(influxdb_api_v1) ->
|
||||||
?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v1");
|
?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v1");
|
||||||
desc(influxdb_api_v2) ->
|
desc(influxdb_api_v2) ->
|
||||||
?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v2");
|
?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v2");
|
||||||
|
desc(influxdb_action) ->
|
||||||
|
?DESC(influxdb_action);
|
||||||
|
desc(action_parameters) ->
|
||||||
|
?DESC(action_parameters);
|
||||||
desc(_) ->
|
desc(_) ->
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
-module(emqx_bridge_influxdb_action_info).
|
||||||
|
|
||||||
|
-behaviour(emqx_action_info).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
bridge_v1_config_to_action_config/2,
|
||||||
|
bridge_v1_config_to_connector_config/1,
|
||||||
|
connector_action_config_to_bridge_v1_config/2,
|
||||||
|
bridge_v1_type_name/0,
|
||||||
|
action_type_name/0,
|
||||||
|
connector_type_name/0,
|
||||||
|
schema_module/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% dynamic callback
|
||||||
|
-export([
|
||||||
|
bridge_v1_type_name_fun/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-import(emqx_utils_conv, [bin/1]).
|
||||||
|
|
||||||
|
-define(SCHEMA_MODULE, emqx_bridge_influxdb).
|
||||||
|
|
||||||
|
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
|
||||||
|
ActionTopLevelKeys = schema_keys(influxdb_action),
|
||||||
|
ActionParametersKeys = schema_keys(action_parameters),
|
||||||
|
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||||
|
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
|
||||||
|
emqx_utils_maps:update_if_present(
|
||||||
|
<<"resource_opts">>,
|
||||||
|
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
|
||||||
|
ActionConfig#{<<"connector">> => ConnectorName}
|
||||||
|
).
|
||||||
|
|
||||||
|
bridge_v1_config_to_connector_config(BridgeV1Config) ->
|
||||||
|
ActionTopLevelKeys = schema_keys(influxdb_action),
|
||||||
|
ActionParametersKeys = schema_keys(action_parameters),
|
||||||
|
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||||
|
ConnectorTopLevelKeys = schema_keys("config_connector"),
|
||||||
|
ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys),
|
||||||
|
ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys,
|
||||||
|
ConnConfig0 = make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config),
|
||||||
|
emqx_utils_maps:update_if_present(
|
||||||
|
<<"resource_opts">>,
|
||||||
|
fun emqx_connector_schema:project_to_connector_resource_opts/1,
|
||||||
|
ConnConfig0
|
||||||
|
).
|
||||||
|
|
||||||
|
connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) ->
|
||||||
|
RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config(
|
||||||
|
ConnectorRawConf, ActionRawConf
|
||||||
|
),
|
||||||
|
maps:without([<<"influxdb_type">>], RawConf).
|
||||||
|
|
||||||
|
bridge_v1_type_name() ->
|
||||||
|
{fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}.
|
||||||
|
|
||||||
|
action_type_name() -> influxdb.
|
||||||
|
|
||||||
|
connector_type_name() -> influxdb.
|
||||||
|
|
||||||
|
schema_module() -> ?SCHEMA_MODULE.
|
||||||
|
|
||||||
|
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"influxdb_type">> := Type}}, _}) ->
|
||||||
|
v1_type(Type).
|
||||||
|
|
||||||
|
v1_type(<<"influxdb_api_v1">>) -> influxdb_api_v1;
|
||||||
|
v1_type(<<"influxdb_api_v2">>) -> influxdb_api_v2.
|
||||||
|
|
||||||
|
make_config_map(PickKeys, IndentKeys, Config) ->
|
||||||
|
Conf0 = maps:with(PickKeys, Config),
|
||||||
|
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
|
||||||
|
|
||||||
|
schema_keys(Name) ->
|
||||||
|
[bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].
|
||||||
|
|
||||||
|
bridge_v1_type_names() -> [influxdb, influxdb_api_v1, influxdb_api_v2].
|
|
@ -19,6 +19,10 @@
|
||||||
callback_mode/0,
|
callback_mode/0,
|
||||||
on_start/2,
|
on_start/2,
|
||||||
on_stop/2,
|
on_stop/2,
|
||||||
|
on_add_channel/4,
|
||||||
|
on_remove_channel/3,
|
||||||
|
on_get_channel_status/3,
|
||||||
|
on_get_channels/1,
|
||||||
on_query/3,
|
on_query/3,
|
||||||
on_batch_query/3,
|
on_batch_query/3,
|
||||||
on_query_async/4,
|
on_query_async/4,
|
||||||
|
@ -34,6 +38,8 @@
|
||||||
desc/1
|
desc/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([precision_field/0, server_field/0]).
|
||||||
|
|
||||||
%% only for test
|
%% only for test
|
||||||
-export([is_unrecoverable_error/1]).
|
-export([is_unrecoverable_error/1]).
|
||||||
|
|
||||||
|
@ -55,6 +61,35 @@
|
||||||
%% resource callback
|
%% resource callback
|
||||||
callback_mode() -> async_if_possible.
|
callback_mode() -> async_if_possible.
|
||||||
|
|
||||||
|
on_add_channel(
|
||||||
|
_InstanceId,
|
||||||
|
#{channels := Channels} = OldState,
|
||||||
|
ChannelId,
|
||||||
|
#{parameters := Parameters} = ChannelConfig0
|
||||||
|
) ->
|
||||||
|
#{write_syntax := WriteSytaxTmpl} = Parameters,
|
||||||
|
Precision = maps:get(precision, Parameters, ms),
|
||||||
|
ChannelConfig = maps:merge(
|
||||||
|
Parameters,
|
||||||
|
ChannelConfig0#{
|
||||||
|
write_syntax => to_config(WriteSytaxTmpl, Precision)
|
||||||
|
}
|
||||||
|
),
|
||||||
|
{ok, OldState#{channels => maps:put(ChannelId, ChannelConfig, Channels)}}.
|
||||||
|
|
||||||
|
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
|
||||||
|
NewState = State#{channels => maps:remove(ChannelId, Channels)},
|
||||||
|
{ok, NewState}.
|
||||||
|
|
||||||
|
on_get_channel_status(InstanceId, _ChannelId, State) ->
|
||||||
|
case on_get_status(InstanceId, State) of
|
||||||
|
connected -> connected;
|
||||||
|
_ -> connecting
|
||||||
|
end.
|
||||||
|
|
||||||
|
on_get_channels(InstanceId) ->
|
||||||
|
emqx_bridge_v2:get_channels_for_connector(InstanceId).
|
||||||
|
|
||||||
on_start(InstId, Config) ->
|
on_start(InstId, Config) ->
|
||||||
%% InstID as pool would be handled by influxdb client
|
%% InstID as pool would be handled by influxdb client
|
||||||
%% so there is no need to allocate pool_name here
|
%% so there is no need to allocate pool_name here
|
||||||
|
@ -73,8 +108,9 @@ on_stop(InstId, _State) ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
|
on_query(InstId, {Channel, Message}, #{channels := ChannelConf, client := Client}) ->
|
||||||
case data_to_points(Data, SyntaxLines) of
|
#{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
|
||||||
|
case data_to_points(Message, SyntaxLines) of
|
||||||
{ok, Points} ->
|
{ok, Points} ->
|
||||||
?tp(
|
?tp(
|
||||||
influxdb_connector_send_query,
|
influxdb_connector_send_query,
|
||||||
|
@ -92,7 +128,9 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c
|
||||||
|
|
||||||
%% Once a Batched Data trans to points failed.
|
%% Once a Batched Data trans to points failed.
|
||||||
%% This batch query failed
|
%% This batch query failed
|
||||||
on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) ->
|
on_batch_query(InstId, BatchData, #{channels := ChannelConf, client := Client}) ->
|
||||||
|
[{Channel, _} | _] = BatchData,
|
||||||
|
#{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
|
||||||
case parse_batch_data(InstId, BatchData, SyntaxLines) of
|
case parse_batch_data(InstId, BatchData, SyntaxLines) of
|
||||||
{ok, Points} ->
|
{ok, Points} ->
|
||||||
?tp(
|
?tp(
|
||||||
|
@ -110,11 +148,12 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
|
||||||
|
|
||||||
on_query_async(
|
on_query_async(
|
||||||
InstId,
|
InstId,
|
||||||
{send_message, Data},
|
{Channel, Message},
|
||||||
{ReplyFun, Args},
|
{ReplyFun, Args},
|
||||||
_State = #{write_syntax := SyntaxLines, client := Client}
|
#{channels := ChannelConf, client := Client}
|
||||||
) ->
|
) ->
|
||||||
case data_to_points(Data, SyntaxLines) of
|
#{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
|
||||||
|
case data_to_points(Message, SyntaxLines) of
|
||||||
{ok, Points} ->
|
{ok, Points} ->
|
||||||
?tp(
|
?tp(
|
||||||
influxdb_connector_send_query,
|
influxdb_connector_send_query,
|
||||||
|
@ -134,8 +173,10 @@ on_batch_query_async(
|
||||||
InstId,
|
InstId,
|
||||||
BatchData,
|
BatchData,
|
||||||
{ReplyFun, Args},
|
{ReplyFun, Args},
|
||||||
#{write_syntax := SyntaxLines, client := Client}
|
#{channels := ChannelConf, client := Client}
|
||||||
) ->
|
) ->
|
||||||
|
[{Channel, _} | _] = BatchData,
|
||||||
|
#{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
|
||||||
case parse_batch_data(InstId, BatchData, SyntaxLines) of
|
case parse_batch_data(InstId, BatchData, SyntaxLines) of
|
||||||
{ok, Points} ->
|
{ok, Points} ->
|
||||||
?tp(
|
?tp(
|
||||||
|
@ -177,30 +218,51 @@ roots() ->
|
||||||
|
|
||||||
fields(common) ->
|
fields(common) ->
|
||||||
[
|
[
|
||||||
{server, server()},
|
server_field(),
|
||||||
{precision,
|
precision_field()
|
||||||
%% The influxdb only supports these 4 precision:
|
|
||||||
%% See "https://github.com/influxdata/influxdb/blob/
|
|
||||||
%% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for
|
|
||||||
%% more information.
|
|
||||||
mk(enum([ns, us, ms, s]), #{
|
|
||||||
required => false, default => ms, desc => ?DESC("precision")
|
|
||||||
})}
|
|
||||||
];
|
];
|
||||||
|
fields("connector_influxdb_api_v1") ->
|
||||||
|
[influxdb_type_field(influxdb_api_v1) | influxdb_api_v1_fields()];
|
||||||
|
fields("connector_influxdb_api_v2") ->
|
||||||
|
[influxdb_type_field(influxdb_api_v2) | influxdb_api_v2_fields()];
|
||||||
fields(influxdb_api_v1) ->
|
fields(influxdb_api_v1) ->
|
||||||
fields(common) ++
|
fields(common) ++ influxdb_api_v1_fields() ++ emqx_connector_schema_lib:ssl_fields();
|
||||||
[
|
|
||||||
{database, mk(binary(), #{required => true, desc => ?DESC("database")})},
|
|
||||||
{username, mk(binary(), #{desc => ?DESC("username")})},
|
|
||||||
{password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}
|
|
||||||
] ++ emqx_connector_schema_lib:ssl_fields();
|
|
||||||
fields(influxdb_api_v2) ->
|
fields(influxdb_api_v2) ->
|
||||||
fields(common) ++
|
fields(common) ++ influxdb_api_v2_fields() ++ emqx_connector_schema_lib:ssl_fields().
|
||||||
[
|
|
||||||
{bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})},
|
influxdb_type_field(Type) ->
|
||||||
{org, mk(binary(), #{required => true, desc => ?DESC("org")})},
|
{influxdb_type, #{
|
||||||
{token, emqx_schema_secret:mk(#{required => true, desc => ?DESC("token")})}
|
required => true,
|
||||||
] ++ emqx_connector_schema_lib:ssl_fields().
|
type => Type,
|
||||||
|
default => Type,
|
||||||
|
desc => ?DESC(atom_to_list(Type))
|
||||||
|
}}.
|
||||||
|
server_field() ->
|
||||||
|
{server, server()}.
|
||||||
|
|
||||||
|
precision_field() ->
|
||||||
|
{precision,
|
||||||
|
%% The influxdb only supports these 4 precision:
|
||||||
|
%% See "https://github.com/influxdata/influxdb/blob/
|
||||||
|
%% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for
|
||||||
|
%% more information.
|
||||||
|
mk(enum([ns, us, ms, s]), #{
|
||||||
|
required => false, default => ms, desc => ?DESC("precision")
|
||||||
|
})}.
|
||||||
|
|
||||||
|
influxdb_api_v1_fields() ->
|
||||||
|
[
|
||||||
|
{database, mk(binary(), #{required => true, desc => ?DESC("database")})},
|
||||||
|
{username, mk(binary(), #{desc => ?DESC("username")})},
|
||||||
|
{password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}
|
||||||
|
].
|
||||||
|
|
||||||
|
influxdb_api_v2_fields() ->
|
||||||
|
[
|
||||||
|
{bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})},
|
||||||
|
{org, mk(binary(), #{required => true, desc => ?DESC("org")})},
|
||||||
|
{token, emqx_schema_secret:mk(#{required => true, desc => ?DESC("token")})}
|
||||||
|
].
|
||||||
|
|
||||||
server() ->
|
server() ->
|
||||||
Meta = #{
|
Meta = #{
|
||||||
|
@ -216,6 +278,10 @@ desc(common) ->
|
||||||
desc(influxdb_api_v1) ->
|
desc(influxdb_api_v1) ->
|
||||||
?DESC("influxdb_api_v1");
|
?DESC("influxdb_api_v1");
|
||||||
desc(influxdb_api_v2) ->
|
desc(influxdb_api_v2) ->
|
||||||
|
?DESC("influxdb_api_v2");
|
||||||
|
desc("connector_influxdb_api_v1") ->
|
||||||
|
?DESC("influxdb_api_v1");
|
||||||
|
desc("connector_influxdb_api_v2") ->
|
||||||
?DESC("influxdb_api_v2").
|
?DESC("influxdb_api_v2").
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
|
@ -248,22 +314,14 @@ start_client(InstId, Config) ->
|
||||||
{error, R}
|
{error, R}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_start_client(
|
do_start_client(InstId, ClientConfig, Config) ->
|
||||||
InstId,
|
|
||||||
ClientConfig,
|
|
||||||
Config = #{write_syntax := Lines}
|
|
||||||
) ->
|
|
||||||
Precision = maps:get(precision, Config, ms),
|
|
||||||
case influxdb:start_client(ClientConfig) of
|
case influxdb:start_client(ClientConfig) of
|
||||||
{ok, Client} ->
|
{ok, Client} ->
|
||||||
case influxdb:is_alive(Client, true) of
|
case influxdb:is_alive(Client, true) of
|
||||||
true ->
|
true ->
|
||||||
case influxdb:check_auth(Client) of
|
case influxdb:check_auth(Client) of
|
||||||
ok ->
|
ok ->
|
||||||
State = #{
|
State = #{client => Client, channels => #{}},
|
||||||
client => Client,
|
|
||||||
write_syntax => to_config(Lines, Precision)
|
|
||||||
},
|
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
msg => "starting_influxdb_connector_success",
|
msg => "starting_influxdb_connector_success",
|
||||||
connector => InstId,
|
connector => InstId,
|
||||||
|
@ -333,23 +391,17 @@ client_config(
|
||||||
] ++ protocol_config(Config).
|
] ++ protocol_config(Config).
|
||||||
|
|
||||||
%% api v1 config
|
%% api v1 config
|
||||||
protocol_config(
|
protocol_config(#{
|
||||||
#{
|
parameters := #{influxdb_type := influxdb_api_v1, database := DB} = Params, ssl := SSL
|
||||||
database := DB,
|
}) ->
|
||||||
ssl := SSL
|
|
||||||
} = Config
|
|
||||||
) ->
|
|
||||||
[
|
[
|
||||||
{protocol, http},
|
{protocol, http},
|
||||||
{version, v1},
|
{version, v1},
|
||||||
{database, str(DB)}
|
{database, str(DB)}
|
||||||
] ++ username(Config) ++
|
] ++ username(Params) ++ password(Params) ++ ssl_config(SSL);
|
||||||
password(Config) ++ ssl_config(SSL);
|
|
||||||
%% api v2 config
|
%% api v2 config
|
||||||
protocol_config(#{
|
protocol_config(#{
|
||||||
bucket := Bucket,
|
parameters := #{influxdb_type := influxdb_api_v2, bucket := Bucket, org := Org, token := Token},
|
||||||
org := Org,
|
|
||||||
token := Token,
|
|
||||||
ssl := SSL
|
ssl := SSL
|
||||||
}) ->
|
}) ->
|
||||||
[
|
[
|
||||||
|
@ -501,7 +553,7 @@ to_maps_config(K, V, Res) ->
|
||||||
%% Tags & Fields Data Trans
|
%% Tags & Fields Data Trans
|
||||||
parse_batch_data(InstId, BatchData, SyntaxLines) ->
|
parse_batch_data(InstId, BatchData, SyntaxLines) ->
|
||||||
{Points, Errors} = lists:foldl(
|
{Points, Errors} = lists:foldl(
|
||||||
fun({send_message, Data}, {ListOfPoints, ErrAccIn}) ->
|
fun({_, Data}, {ListOfPoints, ErrAccIn}) ->
|
||||||
case data_to_points(Data, SyntaxLines) of
|
case data_to_points(Data, SyntaxLines) of
|
||||||
{ok, Points} ->
|
{ok, Points} ->
|
||||||
{[Points | ListOfPoints], ErrAccIn};
|
{[Points | ListOfPoints], ErrAccIn};
|
||||||
|
|
|
@ -34,6 +34,8 @@ resource_type(matrix) ->
|
||||||
emqx_postgresql;
|
emqx_postgresql;
|
||||||
resource_type(mongodb) ->
|
resource_type(mongodb) ->
|
||||||
emqx_bridge_mongodb_connector;
|
emqx_bridge_mongodb_connector;
|
||||||
|
resource_type(influxdb) ->
|
||||||
|
emqx_bridge_influxdb_connector;
|
||||||
resource_type(mysql) ->
|
resource_type(mysql) ->
|
||||||
emqx_bridge_mysql_connector;
|
emqx_bridge_mysql_connector;
|
||||||
resource_type(pgsql) ->
|
resource_type(pgsql) ->
|
||||||
|
@ -112,6 +114,14 @@ connector_structs() ->
|
||||||
required => false
|
required => false
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
{influxdb,
|
||||||
|
mk(
|
||||||
|
hoconsc:map(name, ref(emqx_bridge_influxdb, "config_connector")),
|
||||||
|
#{
|
||||||
|
desc => <<"InfluxDB Connector Config">>,
|
||||||
|
required => false
|
||||||
|
}
|
||||||
|
)},
|
||||||
{mysql,
|
{mysql,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")),
|
hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")),
|
||||||
|
@ -170,6 +180,7 @@ schema_modules() ->
|
||||||
emqx_bridge_kafka,
|
emqx_bridge_kafka,
|
||||||
emqx_bridge_matrix,
|
emqx_bridge_matrix,
|
||||||
emqx_bridge_mongodb,
|
emqx_bridge_mongodb,
|
||||||
|
emqx_bridge_influxdb,
|
||||||
emqx_bridge_mysql,
|
emqx_bridge_mysql,
|
||||||
emqx_bridge_syskeeper_connector,
|
emqx_bridge_syskeeper_connector,
|
||||||
emqx_bridge_syskeeper_proxy,
|
emqx_bridge_syskeeper_proxy,
|
||||||
|
@ -196,6 +207,7 @@ api_schemas(Method) ->
|
||||||
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
|
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
|
||||||
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
|
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
|
||||||
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
|
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
|
||||||
|
api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"),
|
||||||
api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),
|
api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),
|
||||||
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
|
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
|
||||||
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
|
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
|
||||||
|
|
|
@ -131,6 +131,8 @@ connector_type_to_bridge_types(matrix) ->
|
||||||
[matrix];
|
[matrix];
|
||||||
connector_type_to_bridge_types(mongodb) ->
|
connector_type_to_bridge_types(mongodb) ->
|
||||||
[mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
|
[mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
|
||||||
|
connector_type_to_bridge_types(influxdb) ->
|
||||||
|
[influxdb, influxdb_api_v1, influxdb_api_v2];
|
||||||
connector_type_to_bridge_types(mysql) ->
|
connector_type_to_bridge_types(mysql) ->
|
||||||
[mysql];
|
[mysql];
|
||||||
connector_type_to_bridge_types(pgsql) ->
|
connector_type_to_bridge_types(pgsql) ->
|
||||||
|
|
|
@ -47,4 +47,19 @@ Please note that a placeholder for an integer value must be annotated with a suf
|
||||||
write_syntax.label:
|
write_syntax.label:
|
||||||
"""Write Syntax"""
|
"""Write Syntax"""
|
||||||
|
|
||||||
|
action_parameters.label:
|
||||||
|
"""Action Parameters"""
|
||||||
|
action_parameters.desc:
|
||||||
|
"""Additional parameters specific to this action type"""
|
||||||
|
|
||||||
|
influxdb_action.label:
|
||||||
|
"""InfluxDB Action"""
|
||||||
|
influxdb_action.desc:
|
||||||
|
"""Action to interact with a InfluxDB connector"""
|
||||||
|
|
||||||
|
influxdb_parameters.label:
|
||||||
|
"""InfluxdbDB Type Specific Parameters"""
|
||||||
|
influxdb_parameters.desc:
|
||||||
|
"""Set of parameters specific for the given type of this InfluxdbDB connector, `influxdb_type` can be one of `influxdb_api_v1`, `influxdb_api_v1`."""
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue