diff --git a/apps/emqx_bridge_opents/.gitignore b/apps/emqx_bridge_opents/.gitignore new file mode 100644 index 000000000..f1c455451 --- /dev/null +++ b/apps/emqx_bridge_opents/.gitignore @@ -0,0 +1,19 @@ +.rebar3 +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +_build +.idea +*.iml +rebar3.crashdump +*~ diff --git a/apps/emqx_bridge_opents/BSL.txt b/apps/emqx_bridge_opents/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_bridge_opents/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_opents/README.md b/apps/emqx_bridge_opents/README.md new file mode 100644 index 000000000..a172cba15 --- /dev/null +++ b/apps/emqx_bridge_opents/README.md @@ -0,0 +1,9 @@ +emqx_bridge_opentsdb +===== + +An OTP application + +Build +----- + + $ rebar3 compile diff --git a/apps/emqx_bridge_opents/etc/emqx_bridge_opents.conf b/apps/emqx_bridge_opents/etc/emqx_bridge_opents.conf new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx_bridge_opents/rebar.config b/apps/emqx_bridge_opents/rebar.config new file mode 100644 index 000000000..d7bd4560f --- /dev/null +++ b/apps/emqx_bridge_opents/rebar.config @@ -0,0 +1,8 @@ +{erl_opts, [debug_info]}. + +{deps, [ + {opentsdb, {git, "https://github.com/emqx/opentsdb-client-erl", {tag, "v0.5.1"}}}, + {emqx_connector, {path, "../../apps/emqx_connector"}}, + {emqx_resource, {path, "../../apps/emqx_resource"}}, + {emqx_bridge, {path, "../../apps/emqx_bridge"}} +]}. diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src new file mode 100644 index 000000000..d001446b3 --- /dev/null +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src @@ -0,0 +1,15 @@ +{application, emqx_bridge_opents, [ + {description, "EMQX Enterprise OpenTSDB Bridge"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + opentsdb + ]}, + {env, []}, + {modules, []}, + + {licenses, ["BSL"]}, + {links, []} +]}. diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl new file mode 100644 index 000000000..9001e391c --- /dev/null +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl @@ -0,0 +1,85 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_opents). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% api +conn_bridge_examples(Method) -> + [ + #{ + <<"opents">> => #{ + summary => <<"OpenTSDB Bridge">>, + value => values(Method) + } + } + ]. + +values(_Method) -> + #{ + enable => true, + type => opents, + name => <<"foo">>, + server => <<"http://127.0.0.1:4242">>, + pool_size => 8, + resource_opts => #{ + worker_pool_size => 1, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => async, + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + } + }. + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_opents". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})} + ] ++ emqx_resource_schema:fields("resource_opts") ++ + emqx_ee_connector_opents:fields(config); +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- +%% internal + +type_field() -> + {type, mk(enum([opents]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 440889d02..7dc8882b3 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -8,7 +8,8 @@ emqx_ee_connector, telemetry, emqx_bridge_kafka, - emqx_bridge_gcp_pubsub + emqx_bridge_gcp_pubsub, + emqx_bridge_opents ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 7fdfbba99..636166d90 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -35,7 +35,8 @@ api_schemas(Method) -> ref(emqx_ee_bridge_clickhouse, Method), ref(emqx_ee_bridge_dynamo, Method), ref(emqx_ee_bridge_rocketmq, Method), - ref(emqx_ee_bridge_sqlserver, Method) + ref(emqx_ee_bridge_sqlserver, Method), + ref(emqx_bridge_opents, Method) ]. schema_modules() -> @@ -55,7 +56,8 @@ schema_modules() -> emqx_ee_bridge_clickhouse, emqx_ee_bridge_dynamo, emqx_ee_bridge_rocketmq, - emqx_ee_bridge_sqlserver + emqx_ee_bridge_sqlserver, + emqx_bridge_opents ]. examples(Method) -> @@ -94,7 +96,8 @@ resource_type(tdengine) -> emqx_ee_connector_tdengine; resource_type(clickhouse) -> emqx_ee_connector_clickhouse; resource_type(dynamo) -> emqx_ee_connector_dynamo; resource_type(rocketmq) -> emqx_ee_connector_rocketmq; -resource_type(sqlserver) -> emqx_ee_connector_sqlserver. +resource_type(sqlserver) -> emqx_ee_connector_sqlserver; +resource_type(opents) -> emqx_ee_connector_opents. fields(bridges) -> [ @@ -153,6 +156,14 @@ fields(bridges) -> desc => <<"Cassandra Bridge Config">>, required => false } + )}, + {opents, + mk( + hoconsc:map(name, ref(emqx_bridge_opents, "config")), + #{ + desc => <<"OpenTSDB Bridge Config">>, + required => false + } )} ] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs(). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl new file mode 100644 index 000000000..457fde0a0 --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl @@ -0,0 +1,182 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_connector_opents). + +-behaviour(emqx_resource). + +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-export([roots/0, fields/1]). + +%% `emqx_resource' API +-export([ + callback_mode/0, + is_buffer_supported/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2 +]). + +-export([connect/1]). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +%%===================================================================== +%% Hocon schema +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + [ + {server, mk(binary(), #{required => true, desc => ?DESC("server")})}, + {pool_size, fun emqx_connector_schema_lib:pool_size/1}, + {summary, mk(boolean(), #{default => true, desc => ?DESC("summary")})}, + {details, mk(boolean(), #{default => false, desc => ?DESC("details")})}, + {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} + ]. + +%%======================================================================================== +%% `emqx_resource' API +%%======================================================================================== + +callback_mode() -> always_sync. + +is_buffer_supported() -> false. + +on_start( + InstanceId, + #{ + server := Server, + pool_size := PoolSize, + summary := Summary, + details := Details, + resource_opts := #{batch_size := BatchSize} + } = Config +) -> + ?SLOG(info, #{ + msg => "starting_opents_connector", + connector => InstanceId, + config => emqx_utils:redact(Config) + }), + + Options = [ + {server, to_str(Server)}, + {summary, Summary}, + {details, Details}, + {max_batch_size, BatchSize}, + {pool_size, PoolSize} + ], + + State = #{poolname => InstanceId, server => Server}, + case opentsdb_connectivity(Server) of + ok -> + case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of + ok -> + {ok, State}; + Error -> + Error + end; + {error, Reason} = Error -> + ?SLOG(error, #{msg => "Initiate resource failed", reason => Reason}), + Error + end. + +on_stop(InstanceId, #{poolname := PoolName} = _State) -> + ?SLOG(info, #{ + msg => "stopping_opents_connector", + connector => InstanceId + }), + emqx_plugin_libs_pool:stop_pool(PoolName). + +on_query(InstanceId, Request, State) -> + on_batch_query(InstanceId, [Request], State). + +on_batch_query( + InstanceId, + BatchReq, + State +) -> + Datas = [format_opentsdb_msg(Msg) || {_Key, Msg} <- BatchReq], + do_query(InstanceId, Datas, State). + +on_get_status(_InstanceId, #{server := Server}) -> + case opentsdb_connectivity(Server) of + ok -> + connected; + {error, Reason} -> + ?SLOG(error, #{msg => "OpenTSDB lost connection", reason => Reason}), + connecting + end. + +%%======================================================================================== +%% Helper fns +%%======================================================================================== + +do_query(InstanceId, Query, #{poolname := PoolName} = State) -> + ?TRACE( + "QUERY", + "opents_connector_received", + #{connector => InstanceId, query => Query, state => State} + ), + Result = ecpool:pick_and_do(PoolName, {opentsdb, put, [Query]}, no_handover), + + case Result of + {error, Reason} -> + ?tp( + opents_connector_query_return, + #{error => Reason} + ), + ?SLOG(error, #{ + msg => "opents_connector_do_query_failed", + connector => InstanceId, + query => Query, + reason => Reason + }), + Result; + _ -> + ?tp( + opents_connector_query_return, + #{result => Result} + ), + Result + end. + +connect(Opts) -> + opentsdb:start_link(Opts). + +to_str(List) when is_list(List) -> + List; +to_str(Bin) when is_binary(Bin) -> + erlang:binary_to_list(Bin). + +opentsdb_connectivity(Server) -> + SvrUrl = + case Server of + <<"http://", _/binary>> -> Server; + <<"https://", _/binary>> -> Server; + _ -> "http://" ++ Server + end, + emqx_plugin_libs_rule:http_connectivity(SvrUrl). + +format_opentsdb_msg(Msg) -> + maps:with( + [ + timestamp, + metric, + tags, + value, + <<"timestamp">>, + <<"metric">>, + <<"tags">>, + <<"value">> + ], + Msg + ). diff --git a/rel/i18n/emqx_bridge_opents.hocon b/rel/i18n/emqx_bridge_opents.hocon new file mode 100644 index 000000000..ff44a9e18 --- /dev/null +++ b/rel/i18n/emqx_bridge_opents.hocon @@ -0,0 +1,26 @@ +emqx_bridge_opents { + + config_enable.desc: + """Enable or disable this bridge""" + + config_enable.label: + "Enable Or Disable Bridge" + + desc_config.desc: + """Configuration for an OpenTSDB bridge.""" + + desc_config.label: + "OpenTSDB Bridge Configuration" + + desc_type.desc: + """The Bridge Type""" + + desc_type.label: + "Bridge Type" + + desc_name.desc: + """Bridge name.""" + + desc_name.label: + "Bridge Name" +} diff --git a/rel/i18n/emqx_ee_connector_opents.hocon b/rel/i18n/emqx_ee_connector_opents.hocon new file mode 100644 index 000000000..4e51454c9 --- /dev/null +++ b/rel/i18n/emqx_ee_connector_opents.hocon @@ -0,0 +1,20 @@ +emqx_ee_connector_opents { + + server.desc: + """The URL of OpenTSDB endpoint.""" + + server.label: + "URL" + + summary.desc: + """Whether or not to return summary information.""" + + summary.label: + "Summary" + + details.desc: + """Whether or not to return detailed information.""" + + details.label: + "Details" +} diff --git a/rel/i18n/zh/emqx_bridge_opents.hocon b/rel/i18n/zh/emqx_bridge_opents.hocon new file mode 100644 index 000000000..137e687df --- /dev/null +++ b/rel/i18n/zh/emqx_bridge_opents.hocon @@ -0,0 +1,26 @@ +emqx_bridge_opents { + + config_enable.desc: + """启用/禁用桥接""" + + config_enable.label: + "启用/禁用桥接" + + desc_config.desc: + """OpenTSDB 桥接配置""" + + desc_config.label: + "OpenTSDB 桥接配置" + + desc_type.desc: + """Bridge 类型""" + + desc_type.label: + "桥接类型" + + desc_name.desc: + """桥接名字""" + + desc_name.label: + "桥接名字" +} diff --git a/rel/i18n/zh/emqx_ee_connector_opents.hocon b/rel/i18n/zh/emqx_ee_connector_opents.hocon new file mode 100644 index 000000000..7e58da9bd --- /dev/null +++ b/rel/i18n/zh/emqx_ee_connector_opents.hocon @@ -0,0 +1,20 @@ +emqx_ee_connector_opents { + + server.desc: + """服务器的地址。""" + + server.label: + "服务器地址" + + summary.desc: + """是否返回摘要信息。""" + + summary.label: + "摘要信息" + + details.desc: + """是否返回详细信息。""" + + details.label: + "详细信息" +}