feat(opents): OpenTSDB integration
This commit is contained in:
parent
34afa16236
commit
5074825075
|
@ -0,0 +1,19 @@
|
|||
.rebar3
|
||||
_*
|
||||
.eunit
|
||||
*.o
|
||||
*.beam
|
||||
*.plt
|
||||
*.swp
|
||||
*.swo
|
||||
.erlang.cookie
|
||||
ebin
|
||||
log
|
||||
erl_crash.dump
|
||||
.rebar
|
||||
logs
|
||||
_build
|
||||
.idea
|
||||
*.iml
|
||||
rebar3.crashdump
|
||||
*~
|
|
@ -0,0 +1,94 @@
|
|||
Business Source License 1.1
|
||||
|
||||
Licensor: Hangzhou EMQ Technologies Co., Ltd.
|
||||
Licensed Work: EMQX Enterprise Edition
|
||||
The Licensed Work is (c) 2023
|
||||
Hangzhou EMQ Technologies Co., Ltd.
|
||||
Additional Use Grant: Students and educators are granted right to copy,
|
||||
modify, and create derivative work for research
|
||||
or education.
|
||||
Change Date: 2027-02-01
|
||||
Change License: Apache License, Version 2.0
|
||||
|
||||
For information about alternative licensing arrangements for the Software,
|
||||
please contact Licensor: https://www.emqx.com/en/contact
|
||||
|
||||
Notice
|
||||
|
||||
The Business Source License (this document, or the “License”) is not an Open
|
||||
Source license. However, the Licensed Work will eventually be made available
|
||||
under an Open Source License, as stated in this License.
|
||||
|
||||
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
|
||||
“Business Source License” is a trademark of MariaDB Corporation Ab.
|
||||
|
||||
-----------------------------------------------------------------------------
|
||||
|
||||
Business Source License 1.1
|
||||
|
||||
Terms
|
||||
|
||||
The Licensor hereby grants you the right to copy, modify, create derivative
|
||||
works, redistribute, and make non-production use of the Licensed Work. The
|
||||
Licensor may make an Additional Use Grant, above, permitting limited
|
||||
production use.
|
||||
|
||||
Effective on the Change Date, or the fourth anniversary of the first publicly
|
||||
available distribution of a specific version of the Licensed Work under this
|
||||
License, whichever comes first, the Licensor hereby grants you rights under
|
||||
the terms of the Change License, and the rights granted in the paragraph
|
||||
above terminate.
|
||||
|
||||
If your use of the Licensed Work does not comply with the requirements
|
||||
currently in effect as described in this License, you must purchase a
|
||||
commercial license from the Licensor, its affiliated entities, or authorized
|
||||
resellers, or you must refrain from using the Licensed Work.
|
||||
|
||||
All copies of the original and modified Licensed Work, and derivative works
|
||||
of the Licensed Work, are subject to this License. This License applies
|
||||
separately for each version of the Licensed Work and the Change Date may vary
|
||||
for each version of the Licensed Work released by Licensor.
|
||||
|
||||
You must conspicuously display this License on each original or modified copy
|
||||
of the Licensed Work. If you receive the Licensed Work in original or
|
||||
modified form from a third party, the terms and conditions set forth in this
|
||||
License apply to your use of that work.
|
||||
|
||||
Any use of the Licensed Work in violation of this License will automatically
|
||||
terminate your rights under this License for the current and all other
|
||||
versions of the Licensed Work.
|
||||
|
||||
This License does not grant you any right in any trademark or logo of
|
||||
Licensor or its affiliates (provided that you may use a trademark or logo of
|
||||
Licensor as expressly required by this License).
|
||||
|
||||
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
|
||||
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
|
||||
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
|
||||
TITLE.
|
||||
|
||||
MariaDB hereby grants you permission to use this License’s text to license
|
||||
your works, and to refer to it using the trademark “Business Source License”,
|
||||
as long as you comply with the Covenants of Licensor below.
|
||||
|
||||
Covenants of Licensor
|
||||
|
||||
In consideration of the right to use this License’s text and the “Business
|
||||
Source License” name and trademark, Licensor covenants to MariaDB, and to all
|
||||
other recipients of the licensed work to be provided by Licensor:
|
||||
|
||||
1. To specify as the Change License the GPL Version 2.0 or any later version,
|
||||
or a license that is compatible with GPL Version 2.0 or a later version,
|
||||
where “compatible” means that software provided under the Change License can
|
||||
be included in a program with software provided under GPL Version 2.0 or a
|
||||
later version. Licensor may specify additional Change Licenses without
|
||||
limitation.
|
||||
|
||||
2. To either: (a) specify an additional grant of rights to use that does not
|
||||
impose any additional restriction on the right granted in this License, as
|
||||
the Additional Use Grant; or (b) insert the text “None”.
|
||||
|
||||
3. To specify a Change Date.
|
||||
|
||||
4. Not to modify this License in any other way.
|
|
@ -0,0 +1,9 @@
|
|||
emqx_bridge_opentsdb
|
||||
=====
|
||||
|
||||
An OTP application
|
||||
|
||||
Build
|
||||
-----
|
||||
|
||||
$ rebar3 compile
|
|
@ -0,0 +1,8 @@
|
|||
{erl_opts, [debug_info]}.
|
||||
|
||||
{deps, [
|
||||
{opentsdb, {git, "https://github.com/emqx/opentsdb-client-erl", {tag, "v0.5.1"}}},
|
||||
{emqx_connector, {path, "../../apps/emqx_connector"}},
|
||||
{emqx_resource, {path, "../../apps/emqx_resource"}},
|
||||
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
|
||||
]}.
|
|
@ -0,0 +1,15 @@
|
|||
{application, emqx_bridge_opents, [
|
||||
{description, "EMQX Enterprise OpenTSDB Bridge"},
|
||||
{vsn, "0.1.0"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
opentsdb
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
|
||||
{licenses, ["BSL"]},
|
||||
{links, []}
|
||||
]}.
|
|
@ -0,0 +1,85 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_opents).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
-export([
|
||||
conn_bridge_examples/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
namespace/0,
|
||||
roots/0,
|
||||
fields/1,
|
||||
desc/1
|
||||
]).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% api
|
||||
conn_bridge_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"opents">> => #{
|
||||
summary => <<"OpenTSDB Bridge">>,
|
||||
value => values(Method)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
values(_Method) ->
|
||||
#{
|
||||
enable => true,
|
||||
type => opents,
|
||||
name => <<"foo">>,
|
||||
server => <<"http://127.0.0.1:4242">>,
|
||||
pool_size => 8,
|
||||
resource_opts => #{
|
||||
worker_pool_size => 1,
|
||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||
batch_time => ?DEFAULT_BATCH_TIME,
|
||||
query_mode => async,
|
||||
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
|
||||
}
|
||||
}.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Hocon Schema Definitions
|
||||
namespace() -> "bridge_opents".
|
||||
|
||||
roots() -> [].
|
||||
|
||||
fields("config") ->
|
||||
[
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}
|
||||
] ++ emqx_resource_schema:fields("resource_opts") ++
|
||||
emqx_ee_connector_opents:fields(config);
|
||||
fields("post") ->
|
||||
[type_field(), name_field() | fields("config")];
|
||||
fields("put") ->
|
||||
fields("config");
|
||||
fields("get") ->
|
||||
emqx_bridge_schema:status_fields() ++ fields("post").
|
||||
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."];
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% internal
|
||||
|
||||
type_field() ->
|
||||
{type, mk(enum([opents]), #{required => true, desc => ?DESC("desc_type")})}.
|
||||
|
||||
name_field() ->
|
||||
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|
|
@ -8,7 +8,8 @@
|
|||
emqx_ee_connector,
|
||||
telemetry,
|
||||
emqx_bridge_kafka,
|
||||
emqx_bridge_gcp_pubsub
|
||||
emqx_bridge_gcp_pubsub,
|
||||
emqx_bridge_opents
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
|
|
|
@ -35,7 +35,8 @@ api_schemas(Method) ->
|
|||
ref(emqx_ee_bridge_clickhouse, Method),
|
||||
ref(emqx_ee_bridge_dynamo, Method),
|
||||
ref(emqx_ee_bridge_rocketmq, Method),
|
||||
ref(emqx_ee_bridge_sqlserver, Method)
|
||||
ref(emqx_ee_bridge_sqlserver, Method),
|
||||
ref(emqx_bridge_opents, Method)
|
||||
].
|
||||
|
||||
schema_modules() ->
|
||||
|
@ -55,7 +56,8 @@ schema_modules() ->
|
|||
emqx_ee_bridge_clickhouse,
|
||||
emqx_ee_bridge_dynamo,
|
||||
emqx_ee_bridge_rocketmq,
|
||||
emqx_ee_bridge_sqlserver
|
||||
emqx_ee_bridge_sqlserver,
|
||||
emqx_bridge_opents
|
||||
].
|
||||
|
||||
examples(Method) ->
|
||||
|
@ -94,7 +96,8 @@ resource_type(tdengine) -> emqx_ee_connector_tdengine;
|
|||
resource_type(clickhouse) -> emqx_ee_connector_clickhouse;
|
||||
resource_type(dynamo) -> emqx_ee_connector_dynamo;
|
||||
resource_type(rocketmq) -> emqx_ee_connector_rocketmq;
|
||||
resource_type(sqlserver) -> emqx_ee_connector_sqlserver.
|
||||
resource_type(sqlserver) -> emqx_ee_connector_sqlserver;
|
||||
resource_type(opents) -> emqx_ee_connector_opents.
|
||||
|
||||
fields(bridges) ->
|
||||
[
|
||||
|
@ -153,6 +156,14 @@ fields(bridges) ->
|
|||
desc => <<"Cassandra Bridge Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{opents,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_opents, "config")),
|
||||
#{
|
||||
desc => <<"OpenTSDB Bridge Config">>,
|
||||
required => false
|
||||
}
|
||||
)}
|
||||
] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++
|
||||
pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs().
|
||||
|
|
|
@ -0,0 +1,182 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_ee_connector_opents).
|
||||
|
||||
-behaviour(emqx_resource).
|
||||
|
||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
|
||||
-export([roots/0, fields/1]).
|
||||
|
||||
%% `emqx_resource' API
|
||||
-export([
|
||||
callback_mode/0,
|
||||
is_buffer_supported/0,
|
||||
on_start/2,
|
||||
on_stop/2,
|
||||
on_query/3,
|
||||
on_batch_query/3,
|
||||
on_get_status/2
|
||||
]).
|
||||
|
||||
-export([connect/1]).
|
||||
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
%%=====================================================================
|
||||
%% Hocon schema
|
||||
roots() ->
|
||||
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
||||
|
||||
fields(config) ->
|
||||
[
|
||||
{server, mk(binary(), #{required => true, desc => ?DESC("server")})},
|
||||
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
|
||||
{summary, mk(boolean(), #{default => true, desc => ?DESC("summary")})},
|
||||
{details, mk(boolean(), #{default => false, desc => ?DESC("details")})},
|
||||
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
|
||||
].
|
||||
|
||||
%%========================================================================================
|
||||
%% `emqx_resource' API
|
||||
%%========================================================================================
|
||||
|
||||
callback_mode() -> always_sync.
|
||||
|
||||
is_buffer_supported() -> false.
|
||||
|
||||
on_start(
|
||||
InstanceId,
|
||||
#{
|
||||
server := Server,
|
||||
pool_size := PoolSize,
|
||||
summary := Summary,
|
||||
details := Details,
|
||||
resource_opts := #{batch_size := BatchSize}
|
||||
} = Config
|
||||
) ->
|
||||
?SLOG(info, #{
|
||||
msg => "starting_opents_connector",
|
||||
connector => InstanceId,
|
||||
config => emqx_utils:redact(Config)
|
||||
}),
|
||||
|
||||
Options = [
|
||||
{server, to_str(Server)},
|
||||
{summary, Summary},
|
||||
{details, Details},
|
||||
{max_batch_size, BatchSize},
|
||||
{pool_size, PoolSize}
|
||||
],
|
||||
|
||||
State = #{poolname => InstanceId, server => Server},
|
||||
case opentsdb_connectivity(Server) of
|
||||
ok ->
|
||||
case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
Error ->
|
||||
Error
|
||||
end;
|
||||
{error, Reason} = Error ->
|
||||
?SLOG(error, #{msg => "Initiate resource failed", reason => Reason}),
|
||||
Error
|
||||
end.
|
||||
|
||||
on_stop(InstanceId, #{poolname := PoolName} = _State) ->
|
||||
?SLOG(info, #{
|
||||
msg => "stopping_opents_connector",
|
||||
connector => InstanceId
|
||||
}),
|
||||
emqx_plugin_libs_pool:stop_pool(PoolName).
|
||||
|
||||
on_query(InstanceId, Request, State) ->
|
||||
on_batch_query(InstanceId, [Request], State).
|
||||
|
||||
on_batch_query(
|
||||
InstanceId,
|
||||
BatchReq,
|
||||
State
|
||||
) ->
|
||||
Datas = [format_opentsdb_msg(Msg) || {_Key, Msg} <- BatchReq],
|
||||
do_query(InstanceId, Datas, State).
|
||||
|
||||
on_get_status(_InstanceId, #{server := Server}) ->
|
||||
case opentsdb_connectivity(Server) of
|
||||
ok ->
|
||||
connected;
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "OpenTSDB lost connection", reason => Reason}),
|
||||
connecting
|
||||
end.
|
||||
|
||||
%%========================================================================================
|
||||
%% Helper fns
|
||||
%%========================================================================================
|
||||
|
||||
do_query(InstanceId, Query, #{poolname := PoolName} = State) ->
|
||||
?TRACE(
|
||||
"QUERY",
|
||||
"opents_connector_received",
|
||||
#{connector => InstanceId, query => Query, state => State}
|
||||
),
|
||||
Result = ecpool:pick_and_do(PoolName, {opentsdb, put, [Query]}, no_handover),
|
||||
|
||||
case Result of
|
||||
{error, Reason} ->
|
||||
?tp(
|
||||
opents_connector_query_return,
|
||||
#{error => Reason}
|
||||
),
|
||||
?SLOG(error, #{
|
||||
msg => "opents_connector_do_query_failed",
|
||||
connector => InstanceId,
|
||||
query => Query,
|
||||
reason => Reason
|
||||
}),
|
||||
Result;
|
||||
_ ->
|
||||
?tp(
|
||||
opents_connector_query_return,
|
||||
#{result => Result}
|
||||
),
|
||||
Result
|
||||
end.
|
||||
|
||||
connect(Opts) ->
|
||||
opentsdb:start_link(Opts).
|
||||
|
||||
to_str(List) when is_list(List) ->
|
||||
List;
|
||||
to_str(Bin) when is_binary(Bin) ->
|
||||
erlang:binary_to_list(Bin).
|
||||
|
||||
opentsdb_connectivity(Server) ->
|
||||
SvrUrl =
|
||||
case Server of
|
||||
<<"http://", _/binary>> -> Server;
|
||||
<<"https://", _/binary>> -> Server;
|
||||
_ -> "http://" ++ Server
|
||||
end,
|
||||
emqx_plugin_libs_rule:http_connectivity(SvrUrl).
|
||||
|
||||
format_opentsdb_msg(Msg) ->
|
||||
maps:with(
|
||||
[
|
||||
timestamp,
|
||||
metric,
|
||||
tags,
|
||||
value,
|
||||
<<"timestamp">>,
|
||||
<<"metric">>,
|
||||
<<"tags">>,
|
||||
<<"value">>
|
||||
],
|
||||
Msg
|
||||
).
|
|
@ -0,0 +1,26 @@
|
|||
emqx_bridge_opents {
|
||||
|
||||
config_enable.desc:
|
||||
"""Enable or disable this bridge"""
|
||||
|
||||
config_enable.label:
|
||||
"Enable Or Disable Bridge"
|
||||
|
||||
desc_config.desc:
|
||||
"""Configuration for an OpenTSDB bridge."""
|
||||
|
||||
desc_config.label:
|
||||
"OpenTSDB Bridge Configuration"
|
||||
|
||||
desc_type.desc:
|
||||
"""The Bridge Type"""
|
||||
|
||||
desc_type.label:
|
||||
"Bridge Type"
|
||||
|
||||
desc_name.desc:
|
||||
"""Bridge name."""
|
||||
|
||||
desc_name.label:
|
||||
"Bridge Name"
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
emqx_ee_connector_opents {
|
||||
|
||||
server.desc:
|
||||
"""The URL of OpenTSDB endpoint."""
|
||||
|
||||
server.label:
|
||||
"URL"
|
||||
|
||||
summary.desc:
|
||||
"""Whether or not to return summary information."""
|
||||
|
||||
summary.label:
|
||||
"Summary"
|
||||
|
||||
details.desc:
|
||||
"""Whether or not to return detailed information."""
|
||||
|
||||
details.label:
|
||||
"Details"
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
emqx_bridge_opents {
|
||||
|
||||
config_enable.desc:
|
||||
"""启用/禁用桥接"""
|
||||
|
||||
config_enable.label:
|
||||
"启用/禁用桥接"
|
||||
|
||||
desc_config.desc:
|
||||
"""OpenTSDB 桥接配置"""
|
||||
|
||||
desc_config.label:
|
||||
"OpenTSDB 桥接配置"
|
||||
|
||||
desc_type.desc:
|
||||
"""Bridge 类型"""
|
||||
|
||||
desc_type.label:
|
||||
"桥接类型"
|
||||
|
||||
desc_name.desc:
|
||||
"""桥接名字"""
|
||||
|
||||
desc_name.label:
|
||||
"桥接名字"
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
emqx_ee_connector_opents {
|
||||
|
||||
server.desc:
|
||||
"""服务器的地址。"""
|
||||
|
||||
server.label:
|
||||
"服务器地址"
|
||||
|
||||
summary.desc:
|
||||
"""是否返回摘要信息。"""
|
||||
|
||||
summary.label:
|
||||
"摘要信息"
|
||||
|
||||
details.desc:
|
||||
"""是否返回详细信息。"""
|
||||
|
||||
details.label:
|
||||
"详细信息"
|
||||
}
|
Loading…
Reference in New Issue