From 5074825075363047dacba56f11bd46ed166d1f31 Mon Sep 17 00:00:00 2001 From: firest Date: Sun, 23 Apr 2023 09:56:24 +0800 Subject: [PATCH 1/6] feat(opents): OpenTSDB integration --- apps/emqx_bridge_opents/.gitignore | 19 ++ apps/emqx_bridge_opents/BSL.txt | 94 +++++++++ apps/emqx_bridge_opents/README.md | 9 + .../etc/emqx_bridge_opents.conf | 0 apps/emqx_bridge_opents/rebar.config | 8 + .../src/emqx_bridge_opents.app.src | 15 ++ .../src/emqx_bridge_opents.erl | 85 ++++++++ .../emqx_ee_bridge/src/emqx_ee_bridge.app.src | 3 +- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 17 +- .../src/emqx_ee_connector_opents.erl | 182 ++++++++++++++++++ rel/i18n/emqx_bridge_opents.hocon | 26 +++ rel/i18n/emqx_ee_connector_opents.hocon | 20 ++ rel/i18n/zh/emqx_bridge_opents.hocon | 26 +++ rel/i18n/zh/emqx_ee_connector_opents.hocon | 20 ++ 14 files changed, 520 insertions(+), 4 deletions(-) create mode 100644 apps/emqx_bridge_opents/.gitignore create mode 100644 apps/emqx_bridge_opents/BSL.txt create mode 100644 apps/emqx_bridge_opents/README.md create mode 100644 apps/emqx_bridge_opents/etc/emqx_bridge_opents.conf create mode 100644 apps/emqx_bridge_opents/rebar.config create mode 100644 apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src create mode 100644 apps/emqx_bridge_opents/src/emqx_bridge_opents.erl create mode 100644 lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl create mode 100644 rel/i18n/emqx_bridge_opents.hocon create mode 100644 rel/i18n/emqx_ee_connector_opents.hocon create mode 100644 rel/i18n/zh/emqx_bridge_opents.hocon create mode 100644 rel/i18n/zh/emqx_ee_connector_opents.hocon diff --git a/apps/emqx_bridge_opents/.gitignore b/apps/emqx_bridge_opents/.gitignore new file mode 100644 index 000000000..f1c455451 --- /dev/null +++ b/apps/emqx_bridge_opents/.gitignore @@ -0,0 +1,19 @@ +.rebar3 +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +_build +.idea +*.iml +rebar3.crashdump +*~ diff --git a/apps/emqx_bridge_opents/BSL.txt b/apps/emqx_bridge_opents/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_bridge_opents/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_opents/README.md b/apps/emqx_bridge_opents/README.md new file mode 100644 index 000000000..a172cba15 --- /dev/null +++ b/apps/emqx_bridge_opents/README.md @@ -0,0 +1,9 @@ +emqx_bridge_opentsdb +===== + +An OTP application + +Build +----- + + $ rebar3 compile diff --git a/apps/emqx_bridge_opents/etc/emqx_bridge_opents.conf b/apps/emqx_bridge_opents/etc/emqx_bridge_opents.conf new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx_bridge_opents/rebar.config b/apps/emqx_bridge_opents/rebar.config new file mode 100644 index 000000000..d7bd4560f --- /dev/null +++ b/apps/emqx_bridge_opents/rebar.config @@ -0,0 +1,8 @@ +{erl_opts, [debug_info]}. + +{deps, [ + {opentsdb, {git, "https://github.com/emqx/opentsdb-client-erl", {tag, "v0.5.1"}}}, + {emqx_connector, {path, "../../apps/emqx_connector"}}, + {emqx_resource, {path, "../../apps/emqx_resource"}}, + {emqx_bridge, {path, "../../apps/emqx_bridge"}} +]}. diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src new file mode 100644 index 000000000..d001446b3 --- /dev/null +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src @@ -0,0 +1,15 @@ +{application, emqx_bridge_opents, [ + {description, "EMQX Enterprise OpenTSDB Bridge"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + opentsdb + ]}, + {env, []}, + {modules, []}, + + {licenses, ["BSL"]}, + {links, []} +]}. diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl new file mode 100644 index 000000000..9001e391c --- /dev/null +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl @@ -0,0 +1,85 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_opents). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% api +conn_bridge_examples(Method) -> + [ + #{ + <<"opents">> => #{ + summary => <<"OpenTSDB Bridge">>, + value => values(Method) + } + } + ]. + +values(_Method) -> + #{ + enable => true, + type => opents, + name => <<"foo">>, + server => <<"http://127.0.0.1:4242">>, + pool_size => 8, + resource_opts => #{ + worker_pool_size => 1, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => async, + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + } + }. + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_opents". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})} + ] ++ emqx_resource_schema:fields("resource_opts") ++ + emqx_ee_connector_opents:fields(config); +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- +%% internal + +type_field() -> + {type, mk(enum([opents]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 440889d02..7dc8882b3 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -8,7 +8,8 @@ emqx_ee_connector, telemetry, emqx_bridge_kafka, - emqx_bridge_gcp_pubsub + emqx_bridge_gcp_pubsub, + emqx_bridge_opents ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 7fdfbba99..636166d90 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -35,7 +35,8 @@ api_schemas(Method) -> ref(emqx_ee_bridge_clickhouse, Method), ref(emqx_ee_bridge_dynamo, Method), ref(emqx_ee_bridge_rocketmq, Method), - ref(emqx_ee_bridge_sqlserver, Method) + ref(emqx_ee_bridge_sqlserver, Method), + ref(emqx_bridge_opents, Method) ]. schema_modules() -> @@ -55,7 +56,8 @@ schema_modules() -> emqx_ee_bridge_clickhouse, emqx_ee_bridge_dynamo, emqx_ee_bridge_rocketmq, - emqx_ee_bridge_sqlserver + emqx_ee_bridge_sqlserver, + emqx_bridge_opents ]. examples(Method) -> @@ -94,7 +96,8 @@ resource_type(tdengine) -> emqx_ee_connector_tdengine; resource_type(clickhouse) -> emqx_ee_connector_clickhouse; resource_type(dynamo) -> emqx_ee_connector_dynamo; resource_type(rocketmq) -> emqx_ee_connector_rocketmq; -resource_type(sqlserver) -> emqx_ee_connector_sqlserver. +resource_type(sqlserver) -> emqx_ee_connector_sqlserver; +resource_type(opents) -> emqx_ee_connector_opents. fields(bridges) -> [ @@ -153,6 +156,14 @@ fields(bridges) -> desc => <<"Cassandra Bridge Config">>, required => false } + )}, + {opents, + mk( + hoconsc:map(name, ref(emqx_bridge_opents, "config")), + #{ + desc => <<"OpenTSDB Bridge Config">>, + required => false + } )} ] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs(). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl new file mode 100644 index 000000000..457fde0a0 --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl @@ -0,0 +1,182 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_connector_opents). + +-behaviour(emqx_resource). + +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-export([roots/0, fields/1]). + +%% `emqx_resource' API +-export([ + callback_mode/0, + is_buffer_supported/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2 +]). + +-export([connect/1]). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +%%===================================================================== +%% Hocon schema +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + [ + {server, mk(binary(), #{required => true, desc => ?DESC("server")})}, + {pool_size, fun emqx_connector_schema_lib:pool_size/1}, + {summary, mk(boolean(), #{default => true, desc => ?DESC("summary")})}, + {details, mk(boolean(), #{default => false, desc => ?DESC("details")})}, + {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} + ]. + +%%======================================================================================== +%% `emqx_resource' API +%%======================================================================================== + +callback_mode() -> always_sync. + +is_buffer_supported() -> false. + +on_start( + InstanceId, + #{ + server := Server, + pool_size := PoolSize, + summary := Summary, + details := Details, + resource_opts := #{batch_size := BatchSize} + } = Config +) -> + ?SLOG(info, #{ + msg => "starting_opents_connector", + connector => InstanceId, + config => emqx_utils:redact(Config) + }), + + Options = [ + {server, to_str(Server)}, + {summary, Summary}, + {details, Details}, + {max_batch_size, BatchSize}, + {pool_size, PoolSize} + ], + + State = #{poolname => InstanceId, server => Server}, + case opentsdb_connectivity(Server) of + ok -> + case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of + ok -> + {ok, State}; + Error -> + Error + end; + {error, Reason} = Error -> + ?SLOG(error, #{msg => "Initiate resource failed", reason => Reason}), + Error + end. + +on_stop(InstanceId, #{poolname := PoolName} = _State) -> + ?SLOG(info, #{ + msg => "stopping_opents_connector", + connector => InstanceId + }), + emqx_plugin_libs_pool:stop_pool(PoolName). + +on_query(InstanceId, Request, State) -> + on_batch_query(InstanceId, [Request], State). + +on_batch_query( + InstanceId, + BatchReq, + State +) -> + Datas = [format_opentsdb_msg(Msg) || {_Key, Msg} <- BatchReq], + do_query(InstanceId, Datas, State). + +on_get_status(_InstanceId, #{server := Server}) -> + case opentsdb_connectivity(Server) of + ok -> + connected; + {error, Reason} -> + ?SLOG(error, #{msg => "OpenTSDB lost connection", reason => Reason}), + connecting + end. + +%%======================================================================================== +%% Helper fns +%%======================================================================================== + +do_query(InstanceId, Query, #{poolname := PoolName} = State) -> + ?TRACE( + "QUERY", + "opents_connector_received", + #{connector => InstanceId, query => Query, state => State} + ), + Result = ecpool:pick_and_do(PoolName, {opentsdb, put, [Query]}, no_handover), + + case Result of + {error, Reason} -> + ?tp( + opents_connector_query_return, + #{error => Reason} + ), + ?SLOG(error, #{ + msg => "opents_connector_do_query_failed", + connector => InstanceId, + query => Query, + reason => Reason + }), + Result; + _ -> + ?tp( + opents_connector_query_return, + #{result => Result} + ), + Result + end. + +connect(Opts) -> + opentsdb:start_link(Opts). + +to_str(List) when is_list(List) -> + List; +to_str(Bin) when is_binary(Bin) -> + erlang:binary_to_list(Bin). + +opentsdb_connectivity(Server) -> + SvrUrl = + case Server of + <<"http://", _/binary>> -> Server; + <<"https://", _/binary>> -> Server; + _ -> "http://" ++ Server + end, + emqx_plugin_libs_rule:http_connectivity(SvrUrl). + +format_opentsdb_msg(Msg) -> + maps:with( + [ + timestamp, + metric, + tags, + value, + <<"timestamp">>, + <<"metric">>, + <<"tags">>, + <<"value">> + ], + Msg + ). diff --git a/rel/i18n/emqx_bridge_opents.hocon b/rel/i18n/emqx_bridge_opents.hocon new file mode 100644 index 000000000..ff44a9e18 --- /dev/null +++ b/rel/i18n/emqx_bridge_opents.hocon @@ -0,0 +1,26 @@ +emqx_bridge_opents { + + config_enable.desc: + """Enable or disable this bridge""" + + config_enable.label: + "Enable Or Disable Bridge" + + desc_config.desc: + """Configuration for an OpenTSDB bridge.""" + + desc_config.label: + "OpenTSDB Bridge Configuration" + + desc_type.desc: + """The Bridge Type""" + + desc_type.label: + "Bridge Type" + + desc_name.desc: + """Bridge name.""" + + desc_name.label: + "Bridge Name" +} diff --git a/rel/i18n/emqx_ee_connector_opents.hocon b/rel/i18n/emqx_ee_connector_opents.hocon new file mode 100644 index 000000000..4e51454c9 --- /dev/null +++ b/rel/i18n/emqx_ee_connector_opents.hocon @@ -0,0 +1,20 @@ +emqx_ee_connector_opents { + + server.desc: + """The URL of OpenTSDB endpoint.""" + + server.label: + "URL" + + summary.desc: + """Whether or not to return summary information.""" + + summary.label: + "Summary" + + details.desc: + """Whether or not to return detailed information.""" + + details.label: + "Details" +} diff --git a/rel/i18n/zh/emqx_bridge_opents.hocon b/rel/i18n/zh/emqx_bridge_opents.hocon new file mode 100644 index 000000000..137e687df --- /dev/null +++ b/rel/i18n/zh/emqx_bridge_opents.hocon @@ -0,0 +1,26 @@ +emqx_bridge_opents { + + config_enable.desc: + """启用/禁用桥接""" + + config_enable.label: + "启用/禁用桥接" + + desc_config.desc: + """OpenTSDB 桥接配置""" + + desc_config.label: + "OpenTSDB 桥接配置" + + desc_type.desc: + """Bridge 类型""" + + desc_type.label: + "桥接类型" + + desc_name.desc: + """桥接名字""" + + desc_name.label: + "桥接名字" +} diff --git a/rel/i18n/zh/emqx_ee_connector_opents.hocon b/rel/i18n/zh/emqx_ee_connector_opents.hocon new file mode 100644 index 000000000..7e58da9bd --- /dev/null +++ b/rel/i18n/zh/emqx_ee_connector_opents.hocon @@ -0,0 +1,20 @@ +emqx_ee_connector_opents { + + server.desc: + """服务器的地址。""" + + server.label: + "服务器地址" + + summary.desc: + """是否返回摘要信息。""" + + summary.label: + "摘要信息" + + details.desc: + """是否返回详细信息。""" + + details.label: + "详细信息" +} From 0b46acda87716f89cff6ee1e5ec273bf6c11873e Mon Sep 17 00:00:00 2001 From: firest Date: Sun, 23 Apr 2023 09:57:47 +0800 Subject: [PATCH 2/6] test(opents): add test cases for OpenTSDB --- .ci/docker-compose-file/.env | 1 + .../docker-compose-opents.yaml | 9 + .../docker-compose-toxiproxy.yaml | 1 + .ci/docker-compose-file/toxiproxy.json | 6 + .github/workflows/run_test_cases.yaml | 1 + apps/emqx_bridge_opents/.gitignore | 19 - apps/emqx_bridge_opents/docker-ct | 2 + .../etc/emqx_bridge_opents.conf | 0 .../test/emqx_bridge_opents_SUITE.erl | 363 ++++++++++++++++++ .../src/emqx_ee_connector_opents.erl | 16 +- mix.exs | 5 +- rebar.config.erl | 2 + ...con => emqx_bridge_opents_connector.hocon} | 2 +- scripts/ct/run.sh | 9 +- scripts/find-apps.sh | 3 + 15 files changed, 410 insertions(+), 29 deletions(-) create mode 100644 .ci/docker-compose-file/docker-compose-opents.yaml delete mode 100644 apps/emqx_bridge_opents/.gitignore create mode 100644 apps/emqx_bridge_opents/docker-ct delete mode 100644 apps/emqx_bridge_opents/etc/emqx_bridge_opents.conf create mode 100644 apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl rename rel/i18n/{emqx_ee_connector_opents.hocon => emqx_bridge_opents_connector.hocon} (91%) diff --git a/.ci/docker-compose-file/.env b/.ci/docker-compose-file/.env index d33637ea0..3b00b454f 100644 --- a/.ci/docker-compose-file/.env +++ b/.ci/docker-compose-file/.env @@ -7,6 +7,7 @@ INFLUXDB_TAG=2.5.0 TDENGINE_TAG=3.0.2.4 DYNAMO_TAG=1.21.0 CASSANDRA_TAG=3.11.6 +OPENTS_TAG=9aa7f88 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server SQLSERVER_TAG=2019-CU19-ubuntu-20.04 diff --git a/.ci/docker-compose-file/docker-compose-opents.yaml b/.ci/docker-compose-file/docker-compose-opents.yaml new file mode 100644 index 000000000..545aeb015 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-opents.yaml @@ -0,0 +1,9 @@ +version: '3.9' + +services: + opents_server: + container_name: opents + image: petergrace/opentsdb-docker:${OPENTS_TAG} + restart: always + networks: + - emqx_bridge diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index ba5e831a5..a1ae41e2c 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -26,6 +26,7 @@ services: - 19876:9876 - 19042:9042 - 19142:9142 + - 14242:4242 command: - "-host=0.0.0.0" - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index da2dff763..f6b31da4c 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -101,5 +101,11 @@ "listen": "0.0.0.0:1433", "upstream": "sqlserver:1433", "enabled": true + }, + { + "name": "opents", + "listen": "0.0.0.0:4242", + "upstream": "opents:4242", + "enabled": true } ] diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index fb4f264e7..f7b775f08 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -168,6 +168,7 @@ jobs: REDIS_TAG: "7.0" INFLUXDB_TAG: "2.5.0" TDENGINE_TAG: "3.0.2.4" + OPENTS_TAG: "9aa7f88" PROFILE: ${{ matrix.profile }} CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }} run: ./scripts/ct/run.sh --ci --app ${{ matrix.app }} diff --git a/apps/emqx_bridge_opents/.gitignore b/apps/emqx_bridge_opents/.gitignore deleted file mode 100644 index f1c455451..000000000 --- a/apps/emqx_bridge_opents/.gitignore +++ /dev/null @@ -1,19 +0,0 @@ -.rebar3 -_* -.eunit -*.o -*.beam -*.plt -*.swp -*.swo -.erlang.cookie -ebin -log -erl_crash.dump -.rebar -logs -_build -.idea -*.iml -rebar3.crashdump -*~ diff --git a/apps/emqx_bridge_opents/docker-ct b/apps/emqx_bridge_opents/docker-ct new file mode 100644 index 000000000..fc68b978e --- /dev/null +++ b/apps/emqx_bridge_opents/docker-ct @@ -0,0 +1,2 @@ +toxiproxy +opents diff --git a/apps/emqx_bridge_opents/etc/emqx_bridge_opents.conf b/apps/emqx_bridge_opents/etc/emqx_bridge_opents.conf deleted file mode 100644 index e69de29bb..000000000 diff --git a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl new file mode 100644 index 000000000..6f444b93e --- /dev/null +++ b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl @@ -0,0 +1,363 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_opents_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +% DB defaults +-define(BATCH_SIZE, 10). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, with_batch}, + {group, without_batch} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {with_batch, TCs}, + {without_batch, TCs} + ]. + +init_per_group(with_batch, Config0) -> + Config = [{batch_size, ?BATCH_SIZE} | Config0], + common_init(Config); +init_per_group(without_batch, Config0) -> + Config = [{batch_size, 1} | Config0], + common_init(Config); +init_per_group(_Group, Config) -> + Config. + +end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok; +end_per_group(_Group, _Config) -> + ok. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), + ok. + +init_per_testcase(_Testcase, Config) -> + delete_bridge(Config), + snabbkaffe:start_trace(), + Config. + +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok = snabbkaffe:stop(), + delete_bridge(Config), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +common_init(ConfigT) -> + Host = os:getenv("OPENTS_HOST", "toxiproxy"), + Port = list_to_integer(os:getenv("OPENTS_PORT", "4242")), + + Config0 = [ + {opents_host, Host}, + {opents_port, Port}, + {proxy_name, "opents"} + | ConfigT + ], + + BridgeType = proplists:get_value(bridge_type, Config0, <<"opents">>), + case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of + true -> + % Setup toxiproxy + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + % Ensure EE bridge module is loaded + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + emqx_mgmt_api_test_util:init_suite(), + {Name, OpenTSConf} = opents_config(BridgeType, Config0), + Config = + [ + {opents_config, OpenTSConf}, + {opents_bridge_type, BridgeType}, + {opents_name, Name}, + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort} + | Config0 + ], + Config; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_opents); + _ -> + {skip, no_opents} + end + end. + +opents_config(BridgeType, Config) -> + Port = integer_to_list(?config(opents_port, Config)), + Server = "http://" ++ ?config(opents_host, Config) ++ ":" ++ Port, + Name = atom_to_binary(?MODULE), + BatchSize = ?config(batch_size, Config), + ConfigString = + io_lib:format( + "bridges.~s.~s {\n" + " enable = true\n" + " server = ~p\n" + " resource_opts = {\n" + " request_timeout = 500ms\n" + " batch_size = ~b\n" + " query_mode = sync\n" + " }\n" + "}", + [ + BridgeType, + Name, + Server, + BatchSize + ] + ), + {Name, parse_and_check(ConfigString, BridgeType, Name)}. + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, + Config. + +create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> + BridgeType = ?config(opents_bridge_type, Config), + Name = ?config(opents_name, Config), + Config0 = ?config(opents_config, Config), + Config1 = emqx_utils_maps:deep_merge(Config0, Overrides), + emqx_bridge:create(BridgeType, Name, Config1). + +delete_bridge(Config) -> + BridgeType = ?config(opents_bridge_type, Config), + Name = ?config(opents_name, Config), + emqx_bridge:remove(BridgeType, Name). + +create_bridge_http(Params) -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; + Error -> Error + end. + +send_message(Config, Payload) -> + Name = ?config(opents_name, Config), + BridgeType = ?config(opents_bridge_type, Config), + BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), + emqx_bridge:send_message(BridgeID, Payload). + +query_resource(Config, Request) -> + query_resource(Config, Request, 1_000). + +query_resource(Config, Request, Timeout) -> + Name = ?config(opents_name, Config), + BridgeType = ?config(opents_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + emqx_resource:query(ResourceID, Request, #{timeout => Timeout}). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_setup_via_config_and_publish(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + SentData = make_data(), + ?check_trace( + begin + {_, {ok, #{result := Result}}} = + ?wait_async_action( + send_message(Config, SentData), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + ?assertMatch( + {ok, 200, #{failed := 0, success := 1}}, Result + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(opents_connector_query_return, Trace0), + ?assertMatch([#{result := {ok, 200, #{failed := 0, success := 1}}}], Trace), + ok + end + ), + ok. + +t_setup_via_http_api_and_publish(Config) -> + BridgeType = ?config(opents_bridge_type, Config), + Name = ?config(opents_name, Config), + OpentsConfig0 = ?config(opents_config, Config), + OpentsConfig = OpentsConfig0#{ + <<"name">> => Name, + <<"type">> => BridgeType + }, + ?assertMatch( + {ok, _}, + create_bridge_http(OpentsConfig) + ), + SentData = make_data(), + ?check_trace( + begin + Request = {send_message, SentData}, + Res0 = query_resource(Config, Request, 2_500), + ?assertMatch( + {ok, 200, #{failed := 0, success := 1}}, Res0 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(opents_connector_query_return, Trace0), + ?assertMatch([#{result := {ok, 200, #{failed := 0, success := 1}}}], Trace), + ok + end + ), + ok. + +t_get_status(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + + Name = ?config(opents_name, Config), + BridgeType = ?config(opents_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), + ok. + +t_create_disconnected(Config) -> + BridgeType = proplists:get_value(bridge_type, Config, <<"opents">>), + Config1 = lists:keyreplace(opents_port, 1, Config, {opents_port, 61234}), + {_Name, OpenTSConf} = opents_config(BridgeType, Config1), + + Config2 = lists:keyreplace(opents_config, 1, Config1, {opents_config, OpenTSConf}), + ?assertMatch({ok, _}, create_bridge(Config2)), + + Name = ?config(opents_name, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceID)), + ok. + +t_write_failure(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + {ok, _} = create_bridge(Config), + SentData = make_data(), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + {_, {ok, #{result := Result}}} = + ?wait_async_action( + send_message(Config, SentData), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + ?assertMatch({error, _}, Result), + ok + end), + ok. + +t_write_timeout(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + {ok, _} = create_bridge( + Config, + #{ + <<"resource_opts">> => #{ + <<"request_timeout">> => 500, + <<"resume_interval">> => 100, + <<"health_check_interval">> => 100 + } + } + ), + SentData = make_data(), + emqx_common_test_helpers:with_failure( + timeout, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch( + {error, {resource_error, #{reason := timeout}}}, + query_resource(Config, {send_message, SentData}) + ) + end + ), + ok. + +t_missing_data(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + {_, {ok, #{result := Result}}} = + ?wait_async_action( + send_message(Config, #{}), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + ?assertMatch( + {error, {400, #{failed := 1, success := 0}}}, + Result + ), + ok. + +t_bad_data(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Data = maps:without([metric], make_data()), + {_, {ok, #{result := Result}}} = + ?wait_async_action( + send_message(Config, Data), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + + ?assertMatch( + {error, {400, #{failed := 1, success := 0}}}, Result + ), + ok. + +make_data() -> + make_data(<<"cpu">>, 12). + +make_data(Metric, Value) -> + #{ + metric => Metric, + tags => #{ + <<"host">> => <<"serverA">> + }, + value => Value + }. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl index 457fde0a0..dfc960493 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl @@ -108,13 +108,15 @@ on_batch_query( do_query(InstanceId, Datas, State). on_get_status(_InstanceId, #{server := Server}) -> - case opentsdb_connectivity(Server) of - ok -> - connected; - {error, Reason} -> - ?SLOG(error, #{msg => "OpenTSDB lost connection", reason => Reason}), - connecting - end. + Result = + case opentsdb_connectivity(Server) of + ok -> + connected; + {error, Reason} -> + ?SLOG(error, #{msg => "OpenTSDB lost connection", reason => Reason}), + connecting + end, + Result. %%======================================================================================== %% Helper fns diff --git a/mix.exs b/mix.exs index c5d6df804..e2230d55d 100644 --- a/mix.exs +++ b/mix.exs @@ -157,6 +157,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_bridge_kafka, :emqx_bridge_gcp_pubsub, :emqx_bridge_cassandra, + :emqx_bridge_opents, :emqx_bridge_clickhouse, :emqx_bridge_dynamo, :emqx_bridge_hstreamdb, @@ -182,7 +183,8 @@ defmodule EMQXUmbrella.MixProject do {:brod, github: "kafka4beam/brod", tag: "3.16.8"}, {:snappyer, "1.2.8", override: true}, {:crc32cer, "0.1.8", override: true}, - {:supervisor3, "1.1.12", override: true} + {:supervisor3, "1.1.12", override: true}, + {:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true} ] end @@ -360,6 +362,7 @@ defmodule EMQXUmbrella.MixProject do emqx_bridge_kafka: :permanent, emqx_bridge_gcp_pubsub: :permanent, emqx_bridge_cassandra: :permanent, + emqx_bridge_opents: :permanent, emqx_bridge_clickhouse: :permanent, emqx_bridge_dynamo: :permanent, emqx_bridge_hstreamdb: :permanent, diff --git a/rebar.config.erl b/rebar.config.erl index 88471c39d..3c863046f 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -81,6 +81,7 @@ is_enterprise(ee) -> true. is_community_umbrella_app("apps/emqx_bridge_kafka") -> false; is_community_umbrella_app("apps/emqx_bridge_gcp_pubsub") -> false; is_community_umbrella_app("apps/emqx_bridge_cassandra") -> false; +is_community_umbrella_app("apps/emqx_bridge_opents") -> false; is_community_umbrella_app("apps/emqx_bridge_clickhouse") -> false; is_community_umbrella_app("apps/emqx_bridge_dynamo") -> false; is_community_umbrella_app("apps/emqx_bridge_hstreamdb") -> false; @@ -455,6 +456,7 @@ relx_apps_per_edition(ee) -> emqx_bridge_kafka, emqx_bridge_gcp_pubsub, emqx_bridge_cassandra, + emqx_bridge_opents, emqx_bridge_clickhouse, emqx_bridge_dynamo, emqx_bridge_hstreamdb, diff --git a/rel/i18n/emqx_ee_connector_opents.hocon b/rel/i18n/emqx_bridge_opents_connector.hocon similarity index 91% rename from rel/i18n/emqx_ee_connector_opents.hocon rename to rel/i18n/emqx_bridge_opents_connector.hocon index 4e51454c9..cd82809d2 100644 --- a/rel/i18n/emqx_ee_connector_opents.hocon +++ b/rel/i18n/emqx_bridge_opents_connector.hocon @@ -1,4 +1,4 @@ -emqx_ee_connector_opents { +emqx_bridge_opents_connector { server.desc: """The URL of OpenTSDB endpoint.""" diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index ab7fff444..c1a01a593 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -115,7 +115,11 @@ case "${WHICH_APP}" in export PROFILE='emqx' fi ;; - *) + apps/emqx_bridge_opents) + ## ensure enterprise profile when testing ee applications + export PROFILE='emqx-enterprise' + ;; + *) export PROFILE="${PROFILE:-emqx}" ;; esac @@ -188,6 +192,9 @@ for dep in ${CT_DEPS}; do ODBC_REQUEST='yes' FILES+=( '.ci/docker-compose-file/docker-compose-sqlserver.yaml' ) ;; + opents) + FILES+=( '.ci/docker-compose-file/docker-compose-opents.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1 diff --git a/scripts/find-apps.sh b/scripts/find-apps.sh index bfb6ba2cc..64d28529f 100755 --- a/scripts/find-apps.sh +++ b/scripts/find-apps.sh @@ -72,6 +72,9 @@ describe_app() { runner="docker" fi case "${app}" in + apps/emqx_bridge_opents) + profile='emqx-enterprise' + ;; apps/*) if [[ -f "${app}/BSL.txt" ]]; then profile='emqx-enterprise' From 540518eac308608a310f89782c4e1ca83852af98 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 18 Apr 2023 10:24:39 +0800 Subject: [PATCH 3/6] chore: add README for OpenTSDB bridge --- apps/emqx_bridge_opents/README.md | 39 ++++++++++++++++++++++++++----- scripts/ct/run.sh | 6 +---- scripts/find-apps.sh | 3 --- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/apps/emqx_bridge_opents/README.md b/apps/emqx_bridge_opents/README.md index a172cba15..a1d6511ee 100644 --- a/apps/emqx_bridge_opents/README.md +++ b/apps/emqx_bridge_opents/README.md @@ -1,9 +1,36 @@ -emqx_bridge_opentsdb -===== +# EMQX OpenTSDB Bridge -An OTP application +[OpenTSDB](http://opentsdb.net) is a distributed, scalable Time Series Database (TSDB) written on top of HBase. -Build ------ +OpenTSDB was written to address a common need: store, index and serve metrics collected from computer systems (network gear, operating systems, applications) at a large scale, and make this data easily accessible and graphable. - $ rebar3 compile +OpenTSDB allows you to collect thousands of metrics from tens of thousands of hosts and applications, at a high rate (every few seconds). + +OpenTSDB will never delete or downsample data and can easily store hundreds of billions of data points. + +The application is used to connect EMQX and OpenTSDB. User can create a rule and easily ingest IoT data into OpenTSDB by leveraging the +[EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html). + + +# Documentation + +- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) + for the EMQX rules engine introduction. + + +# HTTP APIs + +- Several APIs are provided for bridge management, which includes create bridge, + update bridge, get bridge, stop or restart bridge and list bridges etc. + + Refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges) for more detailed information. + + +# Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + + +# License + +EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt). diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index c1a01a593..c153669f4 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -115,11 +115,7 @@ case "${WHICH_APP}" in export PROFILE='emqx' fi ;; - apps/emqx_bridge_opents) - ## ensure enterprise profile when testing ee applications - export PROFILE='emqx-enterprise' - ;; - *) + *) export PROFILE="${PROFILE:-emqx}" ;; esac diff --git a/scripts/find-apps.sh b/scripts/find-apps.sh index 64d28529f..bfb6ba2cc 100755 --- a/scripts/find-apps.sh +++ b/scripts/find-apps.sh @@ -72,9 +72,6 @@ describe_app() { runner="docker" fi case "${app}" in - apps/emqx_bridge_opents) - profile='emqx-enterprise' - ;; apps/*) if [[ -f "${app}/BSL.txt" ]]; then profile='emqx-enterprise' From 6631fb7457efd61697656d4549886ca9fc4d4287 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 18 Apr 2023 10:41:30 +0800 Subject: [PATCH 4/6] chore: update changes --- changes/ee/feat-10425.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/feat-10425.en.md diff --git a/changes/ee/feat-10425.en.md b/changes/ee/feat-10425.en.md new file mode 100644 index 000000000..7144241df --- /dev/null +++ b/changes/ee/feat-10425.en.md @@ -0,0 +1 @@ +Implement OpenTSDB data bridge. From 932a327952d7fbaeeaecc89a10bc10440940187b Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 20 Apr 2023 15:11:37 +0800 Subject: [PATCH 5/6] chore: make spellcheck and xref happy --- .../emqx_ee_connector/src/emqx_ee_connector_opents.erl | 10 +++++----- rel/i18n/emqx_bridge_opents_connector.hocon | 4 ++-- scripts/spellcheck/dicts/emqx.txt | 1 + 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl index dfc960493..633e120bd 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl @@ -75,10 +75,10 @@ on_start( {pool_size, PoolSize} ], - State = #{poolname => InstanceId, server => Server}, + State = #{pool_name => InstanceId, server => Server}, case opentsdb_connectivity(Server) of ok -> - case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of + case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; Error -> @@ -89,12 +89,12 @@ on_start( Error end. -on_stop(InstanceId, #{poolname := PoolName} = _State) -> +on_stop(InstanceId, #{pool_name := PoolName} = _State) -> ?SLOG(info, #{ msg => "stopping_opents_connector", connector => InstanceId }), - emqx_plugin_libs_pool:stop_pool(PoolName). + emqx_resource_pool:stop(PoolName). on_query(InstanceId, Request, State) -> on_batch_query(InstanceId, [Request], State). @@ -122,7 +122,7 @@ on_get_status(_InstanceId, #{server := Server}) -> %% Helper fns %%======================================================================================== -do_query(InstanceId, Query, #{poolname := PoolName} = State) -> +do_query(InstanceId, Query, #{pool_name := PoolName} = State) -> ?TRACE( "QUERY", "opents_connector_received", diff --git a/rel/i18n/emqx_bridge_opents_connector.hocon b/rel/i18n/emqx_bridge_opents_connector.hocon index cd82809d2..5c39d1e0e 100644 --- a/rel/i18n/emqx_bridge_opents_connector.hocon +++ b/rel/i18n/emqx_bridge_opents_connector.hocon @@ -7,13 +7,13 @@ emqx_bridge_opents_connector { "URL" summary.desc: - """Whether or not to return summary information.""" + """Whether to return summary information.""" summary.label: "Summary" details.desc: - """Whether or not to return detailed information.""" + """Whether to return detailed information.""" details.label: "Details" diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 168275e1e..a9afcf6ca 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -274,3 +274,4 @@ clickhouse FormatType RocketMQ Keyspace +OpenTSDB From 5ad5d7ee8dc60c9ed3c9812a9261e83d6b5ce63e Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 21 Apr 2023 18:32:14 +0800 Subject: [PATCH 6/6] fix(opents): adjust code structure --- apps/emqx_bridge_opents/src/emqx_bridge_opents.erl | 2 +- .../emqx_bridge_opents/src/emqx_bridge_opents_connector.erl | 2 +- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 2 +- ...onnector_opents.hocon => emqx_bridge_opents_connector.hocon} | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl => apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl (99%) rename rel/i18n/zh/{emqx_ee_connector_opents.hocon => emqx_bridge_opents_connector.hocon} (90%) diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl index 9001e391c..2eb6a554f 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl @@ -60,7 +60,7 @@ fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})} ] ++ emqx_resource_schema:fields("resource_opts") ++ - emqx_ee_connector_opents:fields(config); + emqx_bridge_opents_connector:fields(config); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") -> diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl similarity index 99% rename from lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl rename to apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 633e120bd..0366c9dc2 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_opents.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ee_connector_opents). +-module(emqx_bridge_opents_connector). -behaviour(emqx_resource). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 636166d90..4b83fda3f 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -97,7 +97,7 @@ resource_type(clickhouse) -> emqx_ee_connector_clickhouse; resource_type(dynamo) -> emqx_ee_connector_dynamo; resource_type(rocketmq) -> emqx_ee_connector_rocketmq; resource_type(sqlserver) -> emqx_ee_connector_sqlserver; -resource_type(opents) -> emqx_ee_connector_opents. +resource_type(opents) -> emqx_bridge_opents_connector. fields(bridges) -> [ diff --git a/rel/i18n/zh/emqx_ee_connector_opents.hocon b/rel/i18n/zh/emqx_bridge_opents_connector.hocon similarity index 90% rename from rel/i18n/zh/emqx_ee_connector_opents.hocon rename to rel/i18n/zh/emqx_bridge_opents_connector.hocon index 7e58da9bd..f8a80b10e 100644 --- a/rel/i18n/zh/emqx_ee_connector_opents.hocon +++ b/rel/i18n/zh/emqx_bridge_opents_connector.hocon @@ -1,4 +1,4 @@ -emqx_ee_connector_opents { +emqx_bridge_opents_connector { server.desc: """服务器的地址。"""