diff --git a/.ci/docker-compose-file/docker-compose-iotdb.yaml b/.ci/docker-compose-file/docker-compose-iotdb.yaml new file mode 100644 index 000000000..2e1ea881e --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-iotdb.yaml @@ -0,0 +1,31 @@ +version: '3.9' + +services: + iotdb: + container_name: iotdb + hostname: iotdb + image: apache/iotdb:1.1.0-standalone + restart: always + environment: + - enable_rest_service=true + - cn_internal_address=iotdb + - cn_internal_port=10710 + - cn_consensus_port=10720 + - cn_target_config_node_list=iotdb:10710 + - dn_rpc_address=iotdb + - dn_internal_address=iotdb + - dn_rpc_port=6667 + - dn_mpp_data_exchange_port=10740 + - dn_schema_region_consensus_port=10750 + - dn_data_region_consensus_port=10760 + - dn_target_config_node_list=iotdb:10710 + # volumes: + # - ./data:/iotdb/data + # - ./logs:/iotdb/logs + expose: + - "18080" + # IoTDB's REST interface, uncomment for local testing + # ports: + # - "18080:18080" + networks: + - emqx_bridge diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index a1ae41e2c..d91118406 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -27,6 +27,7 @@ services: - 19042:9042 - 19142:9142 - 14242:4242 + - 28080:18080 command: - "-host=0.0.0.0" - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index e4fbfa62a..dee3134f5 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -125,5 +125,11 @@ "listen": "0.0.0.0:1521", "upstream": "oracle:1521", "enabled": true + }, + { + "name": "iotdb", + "listen": "0.0.0.0:18080", + "upstream": "iotdb:18080", + "enabled": true } ] diff --git a/apps/emqx/src/emqx_logger.erl b/apps/emqx/src/emqx_logger.erl index 114a1af49..6087acd8a 100644 --- a/apps/emqx/src/emqx_logger.erl +++ b/apps/emqx/src/emqx_logger.erl @@ -237,7 +237,7 @@ set_log_handler_level(HandlerId, Level) -> end. %% @doc Set both the primary and all handlers level in one command --spec set_log_level(logger:handler_id()) -> ok | {error, term()}. +-spec set_log_level(logger:level()) -> ok | {error, term()}. set_log_level(Level) -> case set_primary_log_level(Level) of ok -> set_all_log_handlers_level(Level); diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index a37b6db3c..3aade0369 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -72,7 +72,8 @@ T == cassandra; T == sqlserver; T == pulsar_producer; - T == oracle + T == oracle; + T == iotdb ). load() -> diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index da98b073e..a8dd76214 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -56,6 +56,11 @@ (TYPE) =:= <<"kafka_consumer">> orelse ?IS_BI_DIR_BRIDGE(TYPE) ). +%% [FIXME] this has no place here, it's used in parse_confs/3, which should +%% rather delegate to a behavior callback than implementing domain knowledge +%% here (reversed dependency) +-define(INSERT_TABLET_PATH, "/rest/v2/insertTablet"). + -if(?EMQX_RELEASE_EDITION == ee). bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; @@ -329,6 +334,30 @@ parse_confs( max_retries => Retry } }; +parse_confs(<<"iotdb">>, Name, Conf) -> + #{ + base_url := BaseURL, + authentication := + #{ + username := Username, + password := Password + } + } = Conf, + BasicToken = base64:encode(<>), + WebhookConfig = + Conf#{ + method => <<"post">>, + url => <>, + headers => [ + {<<"Content-type">>, <<"application/json">>}, + {<<"Authorization">>, BasicToken} + ] + }, + parse_confs( + <<"webhook">>, + Name, + WebhookConfig + ); parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) -> %% For some drivers that can be used as data-sources, we need to provide a %% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl new file mode 100644 index 000000000..47f29aa36 --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -0,0 +1,350 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_testlib). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% ct setup helpers + +init_per_suite(Config, Apps) -> + [{start_apps, Apps} | Config]. + +end_per_suite(Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?config(start_apps, Config))), + _ = application:stop(emqx_connector), + ok. + +init_per_group(TestGroup, BridgeType, Config) -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + application:load(emqx_bridge), + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps(?config(start_apps, Config)), + {ok, _} = application:ensure_all_started(emqx_connector), + emqx_mgmt_api_test_util:init_suite(), + UniqueNum = integer_to_binary(erlang:unique_integer([positive])), + MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>, + [ + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {mqtt_topic, MQTTTopic}, + {test_group, TestGroup}, + {bridge_type, BridgeType} + | Config + ]. + +end_per_group(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + delete_all_bridges(), + ok. + +init_per_testcase(TestCase, Config0, BridgeConfigCb) -> + ct:timetrap(timer:seconds(60)), + delete_all_bridges(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + BridgeTopic = + << + (atom_to_binary(TestCase))/binary, + UniqueNum/binary + >>, + TestGroup = ?config(test_group, Config0), + Config = [{bridge_topic, BridgeTopic} | Config0], + {Name, ConfigString, BridgeConfig} = BridgeConfigCb( + TestCase, TestGroup, Config + ), + ok = snabbkaffe:start_trace(), + [ + {bridge_name, Name}, + {bridge_config_string, ConfigString}, + {bridge_config, BridgeConfig} + | Config + ]. + +end_per_testcase(_Testcase, Config) -> + case proplists:get_bool(skip_does_not_apply, Config) of + true -> + ok; + false -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + delete_all_bridges(), + %% in CI, apparently this needs more time since the + %% machines struggle with all the containers running... + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + ok + end. + +delete_all_bridges() -> + lists:foreach( + fun(#{name := Name, type := Type}) -> + emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ). + +%% test helpers +parse_and_check(Config, ConfigString, Name) -> + BridgeType = ?config(bridge_type, Config), + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := BridgeConfig}}} = RawConf, + BridgeConfig. + +resource_id(Config) -> + BridgeType = ?config(bridge_type, Config), + Name = ?config(bridge_name, Config), + emqx_bridge_resource:resource_id(BridgeType, Name). + +create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> + BridgeType = ?config(bridge_type, Config), + Name = ?config(bridge_name, Config), + BridgeConfig0 = ?config(bridge_config, Config), + BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), + emqx_bridge:create(BridgeType, Name, BridgeConfig). + +create_bridge_api(Config) -> + create_bridge_api(Config, _Overrides = #{}). + +create_bridge_api(Config, Overrides) -> + BridgeType = ?config(bridge_type, Config), + Name = ?config(bridge_name, Config), + BridgeConfig0 = ?config(bridge_config, Config), + BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), + Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("creating bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; + Error -> + Error + end, + ct:pal("bridge create result: ~p", [Res]), + Res. + +update_bridge_api(Config) -> + update_bridge_api(Config, _Overrides = #{}). + +update_bridge_api(Config, Overrides) -> + BridgeType = ?config(bridge_type, Config), + Name = ?config(bridge_name, Config), + BridgeConfig0 = ?config(bridge_config, Config), + BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), + BridgeId = emqx_bridge_resource:bridge_id(BridgeType, Name), + Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("updating bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Params, Opts) of + {ok, {_Status, _Headers, Body0}} -> {ok, emqx_utils_json:decode(Body0, [return_maps])}; + Error -> Error + end, + ct:pal("bridge update result: ~p", [Res]), + Res. + +probe_bridge_api(Config) -> + probe_bridge_api(Config, _Overrides = #{}). + +probe_bridge_api(Config, _Overrides) -> + BridgeType = ?config(bridge_type, Config), + Name = ?config(bridge_name, Config), + BridgeConfig = ?config(bridge_config, Config), + Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("probing bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of + {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0}; + Error -> Error + end, + ct:pal("bridge probe result: ~p", [Res]), + Res. + +create_rule_and_action_http(BridgeType, RuleTopic, Config) -> + BridgeName = ?config(bridge_name, Config), + BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + Params = #{ + enable => true, + sql => <<"SELECT * FROM \"", RuleTopic/binary, "\"">>, + actions => [BridgeId] + }, + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + ct:pal("rule action params: ~p", [Params]), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; + Error -> Error + end. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_sync_query(Config, MakeMessageFun, IsSuccessCheck) -> + ResourceId = resource_id(Config), + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + Message = {send_message, MakeMessageFun()}, + IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)), + ok + end, + [] + ), + ok. + +t_async_query(Config, MakeMessageFun, IsSuccessCheck) -> + ResourceId = resource_id(Config), + ReplyFun = + fun(Pid, Result) -> + Pid ! {result, Result} + end, + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + Message = {send_message, MakeMessageFun()}, + emqx_resource:query(ResourceId, Message, #{async_reply_fun => {ReplyFun, [self()]}}), + ok + end, + [] + ), + receive + {result, Result} -> IsSuccessCheck(Result) + after 5_000 -> + throw(timeout) + end, + ok. + +t_create_via_http(Config) -> + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config)), + + %% lightweight matrix testing some configs + ?assertMatch( + {ok, _}, + update_bridge_api( + Config + ) + ), + ?assertMatch( + {ok, _}, + update_bridge_api( + Config + ) + ), + ok + end, + [] + ), + ok. + +t_start_stop(Config, StopTracePoint) -> + BridgeType = ?config(bridge_type, Config), + BridgeName = ?config(bridge_name, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge(Config)), + %% Since the connection process is async, we give it some time to + %% stabilize and avoid flakiness. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + %% Check that the bridge probe API doesn't leak atoms. + ProbeRes0 = probe_bridge_api( + Config, + #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), + AtomsBefore = erlang:system_info(atom_count), + %% Probe again; shouldn't have created more atoms. + ProbeRes1 = probe_bridge_api( + Config, + #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1), + AtomsAfter = erlang:system_info(atom_count), + ?assertEqual(AtomsBefore, AtomsAfter), + + %% Now stop the bridge. + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_bridge:disable_enable(disable, BridgeType, BridgeName), + #{?snk_kind := StopTracePoint}, + 5_000 + ) + ), + + ok + end, + fun(Trace) -> + %% one for each probe, one for real + ?assertMatch([_, _, _], ?of_kind(StopTracePoint, Trace)), + ok + end + ), + ok. + +t_on_get_status(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + ResourceId = resource_id(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + %% Since the connection process is async, we give it some time to + %% stabilize and avoid flakiness. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ct:sleep(500), + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)) + end), + %% Check that it recovers itself. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + ok. diff --git a/apps/emqx_bridge_iotdb/.gitignore b/apps/emqx_bridge_iotdb/.gitignore new file mode 100644 index 000000000..e9bc1c544 --- /dev/null +++ b/apps/emqx_bridge_iotdb/.gitignore @@ -0,0 +1,19 @@ +.rebar3 + _* + .eunit + *.o + *.beam + *.plt + *.swp + *.swo + .erlang.cookie + ebin + log + erl_crash.dump + .rebar + logs + _build + .idea + *.iml + rebar3.crashdump + *~ diff --git a/apps/emqx_bridge_iotdb/BSL.txt b/apps/emqx_bridge_iotdb/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_bridge_iotdb/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_iotdb/README.md b/apps/emqx_bridge_iotdb/README.md new file mode 100644 index 000000000..48f5d74c2 --- /dev/null +++ b/apps/emqx_bridge_iotdb/README.md @@ -0,0 +1,26 @@ +# Apache IoTDB Data Integration Bridge + +This application houses the IoTDB data integration bridge for EMQX Enterprise + Edition. It provides the means to connect to IoTDB and publish messages to it. + +It implements the connection management and interaction without need for a + separate connector app, since it's not used by authentication and authorization + applications. + +# Documentation links + +For more information on Apache IoTDB, please see its [official + site](https://iotdb.apache.org/). + +# Configurations + +Please see [our official + documentation](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-iotdb.html) + for more detailed info. + +# Contributing - [Mandatory] +Please see our [contributing.md](../../CONTRIBUTING.md). + +# License + +See [BSL](./BSL.txt). diff --git a/apps/emqx_bridge_iotdb/docker-ct b/apps/emqx_bridge_iotdb/docker-ct new file mode 100644 index 000000000..8a8973a88 --- /dev/null +++ b/apps/emqx_bridge_iotdb/docker-ct @@ -0,0 +1,2 @@ +toxiproxy +iotdb diff --git a/apps/emqx_bridge_iotdb/etc/emqx_bridge_iotdb.conf b/apps/emqx_bridge_iotdb/etc/emqx_bridge_iotdb.conf new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl b/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl new file mode 100644 index 000000000..5e6bf9ac5 --- /dev/null +++ b/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl @@ -0,0 +1,11 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_BRIDGE_IOTDB_HRL). +-define(EMQX_BRIDGE_IOTDB_HRL, true). + +-define(VSN_1_0_X, 'v1.0.x'). +-define(VSN_0_13_X, 'v0.13.x'). + +-endif. diff --git a/apps/emqx_bridge_iotdb/rebar.config b/apps/emqx_bridge_iotdb/rebar.config new file mode 100644 index 000000000..a4afd2877 --- /dev/null +++ b/apps/emqx_bridge_iotdb/rebar.config @@ -0,0 +1,14 @@ +%% -*- mode: erlang -*- + +{erl_opts, [ + debug_info +]}. + +{deps, [ + {emqx, {path, "../../apps/emqx"}}, + {emqx_connector, {path, "../../apps/emqx_connector"}}, + {emqx_resource, {path, "../../apps/emqx_resource"}}, + {emqx_bridge, {path, "../../apps/emqx_bridge"}} +]}. +{plugins, [rebar3_path_deps]}. +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src new file mode 100644 index 000000000..9c5108307 --- /dev/null +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -0,0 +1,22 @@ +%% -*- mode: erlang -*- +{application, emqx_bridge_iotdb, [ + {description, "EMQX Enterprise Apache IoTDB Bridge"}, + {vsn, "0.1.0"}, + {modules, [ + emqx_bridge_iotdb, + emqx_bridge_iotdb_impl + ]}, + {registered, []}, + {applications, [ + kernel, + stdlib, + emqx_connector + ]}, + {env, []}, + {licenses, ["Business Source License 1.1"]}, + {maintainers, ["EMQX Team "]}, + {links, [ + {"Homepage", "https://emqx.io/"}, + {"Github", "https://github.com/emqx/emqx"} + ]} +]}. diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl new file mode 100644 index 000000000..e0312bb02 --- /dev/null +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -0,0 +1,232 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_iotdb). + +-include("emqx_bridge_iotdb.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +%% hocon_schema API +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% emqx_ee_bridge "unofficial" API +-export([conn_bridge_examples/1]). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> "bridge_iotdb". + +roots() -> []. + +fields("config") -> + basic_config() ++ request_config(); +fields("post") -> + [ + type_field(), + name_field() + ] ++ fields("config"); +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"); +fields("creation_opts") -> + lists:filter( + fun({K, _V}) -> + not lists:member(K, unsupported_opts()) + end, + emqx_resource_schema:fields("creation_opts") + ); +fields(auth_basic) -> + [ + {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})}, + {password, + mk(binary(), #{ + required => true, + desc => ?DESC("config_auth_basic_password"), + sensitive => true, + converter => fun emqx_schema:password_converter/2 + })} + ]. + +desc("config") -> + ?DESC("desc_config"); +desc("creation_opts") -> + ?DESC(emqx_resource_schema, "creation_opts"); +desc("post") -> + ["Configuration for IoTDB using `POST` method."]; +desc(Name) -> + lists:member(Name, struct_names()) orelse throw({missing_desc, Name}), + ?DESC(Name). + +struct_names() -> + [ + auth_basic + ]. + +basic_config() -> + [ + {enable, + mk( + boolean(), + #{ + desc => ?DESC("config_enable"), + default => true + } + )}, + {authentication, + mk( + hoconsc:union([ref(?MODULE, auth_basic)]), + #{ + default => auth_basic, desc => ?DESC("config_authentication") + } + )}, + {is_aligned, + mk( + boolean(), + #{ + desc => ?DESC("config_is_aligned"), + default => false + } + )}, + {device_id, + mk( + binary(), + #{ + desc => ?DESC("config_device_id") + } + )}, + {iotdb_version, + mk( + hoconsc:enum([?VSN_1_0_X, ?VSN_0_13_X]), + #{ + desc => ?DESC("config_iotdb_version"), + default => ?VSN_1_0_X + } + )} + ] ++ resource_creation_opts() ++ + proplists_without( + [max_retries, base_url, request], + emqx_connector_http:fields(config) + ). + +proplists_without(Keys, List) -> + [El || El = {K, _} <- List, not lists:member(K, Keys)]. + +request_config() -> + [ + {base_url, + mk( + emqx_schema:url(), + #{ + desc => ?DESC("config_base_url") + } + )}, + {max_retries, + mk( + non_neg_integer(), + #{ + default => 2, + desc => ?DESC("config_max_retries") + } + )}, + {request_timeout, + mk( + emqx_schema:duration_ms(), + #{ + default => <<"15s">>, + desc => ?DESC("config_request_timeout") + } + )} + ]. + +resource_creation_opts() -> + [ + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ]. + +unsupported_opts() -> + [ + batch_size, + batch_time + ]. + +%%====================================================================================== + +type_field() -> + {type, + mk( + hoconsc:enum([iotdb]), + #{ + required => true, + desc => ?DESC("desc_type") + } + )}. + +name_field() -> + {name, + mk( + binary(), + #{ + required => true, + desc => ?DESC("desc_name") + } + )}. + +%%====================================================================================== + +conn_bridge_examples(Method) -> + [ + #{ + <<"iotdb">> => + #{ + summary => <<"Apache IoTDB Bridge">>, + value => conn_bridge_example(Method, iotdb) + } + } + ]. + +conn_bridge_example(_Method, Type) -> + #{ + name => <<"My IoTDB Bridge">>, + type => Type, + enable => true, + authentication => #{ + <<"username">> => <<"root">>, + <<"password">> => <<"*****">> + }, + is_aligned => false, + device_id => <<"my_device">>, + base_url => <<"http://iotdb.local:18080/">>, + iotdb_version => ?VSN_1_0_X, + connect_timeout => <<"15s">>, + pool_type => <<"random">>, + pool_size => 8, + enable_pipelining => 100, + ssl => #{enable => false}, + resource_opts => #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + query_mode => async, + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + } + }. diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl new file mode 100644 index 000000000..2f8794560 --- /dev/null +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -0,0 +1,382 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_iotdb_impl). + +-include("emqx_bridge_iotdb.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% `emqx_resource' API +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_get_status/2, + on_query/3, + on_query_async/4 +]). + +-type config() :: + #{ + base_url := #{ + scheme := http | https, + host := iolist(), + port := inet:port_number(), + path := '_' + }, + connect_timeout := pos_integer(), + pool_type := random | hash, + pool_size := pos_integer(), + request := undefined | map(), + is_aligned := boolean(), + iotdb_version := binary(), + device_id := binary() | undefined, + atom() => '_' + }. + +-type state() :: + #{ + base_path := '_', + base_url := #{ + scheme := http | https, + host := iolist(), + port := inet:port_number(), + path := '_' + }, + connect_timeout := pos_integer(), + pool_type := random | hash, + pool_size := pos_integer(), + request := undefined | map(), + is_aligned := boolean(), + iotdb_version := binary(), + device_id := binary() | undefined, + atom() => '_' + }. + +-type manager_id() :: binary(). + +%%------------------------------------------------------------------------------------- +%% `emqx_resource' API +%%------------------------------------------------------------------------------------- +callback_mode() -> async_if_possible. + +-spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). +on_start(InstanceId, Config) -> + %% [FIXME] The configuration passed in here is pre-processed and transformed + %% in emqx_bridge_resource:parse_confs/2. + case emqx_connector_http:on_start(InstanceId, Config) of + {ok, State} -> + ?SLOG(info, #{ + msg => "iotdb_bridge_started", + instance_id => InstanceId, + request => maps:get(request, State, <<>>) + }), + ?tp(iotdb_bridge_started, #{}), + {ok, maps:merge(Config, State)}; + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_start_iotdb_bridge", + instance_id => InstanceId, + base_url => maps:get(request, Config, <<>>), + reason => Reason + }), + throw(failed_to_start_iotdb_bridge) + end. + +-spec on_stop(manager_id(), state()) -> ok | {error, term()}. +on_stop(InstanceId, State) -> + ?SLOG(info, #{ + msg => "stopping_iotdb_bridge", + connector => InstanceId + }), + Res = emqx_connector_http:on_stop(InstanceId, State), + ?tp(iotdb_bridge_stopped, #{instance_id => InstanceId}), + Res. + +-spec on_get_status(manager_id(), state()) -> + {connected, state()} | {disconnected, state(), term()}. +on_get_status(InstanceId, State) -> + emqx_connector_http:on_get_status(InstanceId, State). + +-spec on_query(manager_id(), {send_message, map()}, state()) -> + {ok, pos_integer(), [term()], term()} + | {ok, pos_integer(), [term()]} + | {error, term()}. +on_query(InstanceId, {send_message, Message}, State) -> + ?SLOG(debug, #{ + msg => "iotdb_bridge_on_query_called", + instance_id => InstanceId, + send_message => Message, + state => emqx_utils:redact(State) + }), + IoTDBPayload = make_iotdb_insert_request(Message, State), + handle_response( + emqx_connector_http:on_query( + InstanceId, {send_message, IoTDBPayload}, State + ) + ). + +-spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) -> + {ok, pid()}. +on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> + ?SLOG(debug, #{ + msg => "iotdb_bridge_on_query_async_called", + instance_id => InstanceId, + send_message => Message, + state => emqx_utils:redact(State) + }), + IoTDBPayload = make_iotdb_insert_request(Message, State), + ReplyFunAndArgs = + { + fun(Result) -> + Response = handle_response(Result), + emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) + end, + [] + }, + emqx_connector_http:on_query_async( + InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State + ). + +%%-------------------------------------------------------------------- +%% Internal Functions +%%-------------------------------------------------------------------- + +preproc_data(DataList) -> + lists:map( + fun( + #{ + measurement := Measurement, + data_type := DataType, + value := Value + } = Data + ) -> + #{ + timestamp => emqx_plugin_libs_rule:preproc_tmpl( + maps:get(<<"timestamp">>, Data, <<"now">>) + ), + measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), + data_type => DataType, + value => emqx_plugin_libs_rule:preproc_tmpl(Value) + } + end, + DataList + ). + +proc_data(PreProcessedData, Msg) -> + NowNS = erlang:system_time(nanosecond), + Nows = #{ + now_ms => erlang:convert_time_unit(NowNS, nanosecond, millisecond), + now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond), + now_ns => NowNS + }, + lists:map( + fun( + #{ + timestamp := TimestampTkn, + measurement := Measurement, + data_type := DataType, + value := ValueTkn + } + ) -> + #{ + timestamp => iot_timestamp( + emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows + ), + measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Msg), + data_type => DataType, + value => proc_value(DataType, ValueTkn, Msg) + } + end, + PreProcessedData + ). + +iot_timestamp(Timestamp, #{now_ms := NowMs}) when + Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>> +-> + NowMs; +iot_timestamp(Timestamp, #{now_us := NowUs}) when Timestamp =:= <<"now_us">> -> + NowUs; +iot_timestamp(Timestamp, #{now_ns := NowNs}) when Timestamp =:= <<"now_ns">> -> + NowNs; +iot_timestamp(Timestamp, _) when is_binary(Timestamp) -> + binary_to_integer(Timestamp). + +proc_value(<<"TEXT">>, ValueTkn, Msg) -> + case emqx_plugin_libs_rule:proc_tmpl(ValueTkn, Msg) of + <<"undefined">> -> null; + Val -> Val + end; +proc_value(<<"BOOLEAN">>, ValueTkn, Msg) -> + convert_bool(replace_var(ValueTkn, Msg)); +proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> -> + convert_int(replace_var(ValueTkn, Msg)); +proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> -> + convert_float(replace_var(ValueTkn, Msg)). + +replace_var(Tokens, Data) when is_list(Tokens) -> + [Val] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), + Val; +replace_var(Val, _Data) -> + Val. + +convert_bool(B) when is_boolean(B) -> B; +convert_bool(1) -> true; +convert_bool(0) -> false; +convert_bool(<<"1">>) -> true; +convert_bool(<<"0">>) -> false; +convert_bool(<<"true">>) -> true; +convert_bool(<<"True">>) -> true; +convert_bool(<<"TRUE">>) -> true; +convert_bool(<<"false">>) -> false; +convert_bool(<<"False">>) -> false; +convert_bool(<<"FALSE">>) -> false; +convert_bool(undefined) -> null. + +convert_int(Int) when is_integer(Int) -> Int; +convert_int(Float) when is_float(Float) -> floor(Float); +convert_int(Str) when is_binary(Str) -> + try + binary_to_integer(Str) + catch + _:_ -> + convert_int(binary_to_float(Str)) + end; +convert_int(undefined) -> + null. + +convert_float(Float) when is_float(Float) -> Float; +convert_float(Int) when is_integer(Int) -> Int * 10 / 10; +convert_float(Str) when is_binary(Str) -> + try + binary_to_float(Str) + catch + _:_ -> + convert_float(binary_to_integer(Str)) + end; +convert_float(undefined) -> + null. + +make_iotdb_insert_request(Message, State) -> + IsAligned = maps:get(is_aligned, State, false), + DeviceId = device_id(Message, State), + IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X), + Payload = make_list(maps:get(payload, Message)), + PreProcessedData = preproc_data(Payload), + DataList = proc_data(PreProcessedData, Message), + InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, + Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), + maps:merge(Rows, #{ + iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, + iotdb_field_key(device_id, IotDBVsn) => DeviceId + }). + +replace_dtypes(Rows, IotDBVsn) -> + {Types, Map} = maps:take(dtypes, Rows), + Map#{iotdb_field_key(data_types, IotDBVsn) => Types}. + +aggregate_rows(DataList, InitAcc) -> + lists:foldr( + fun( + #{ + timestamp := Timestamp, + measurement := Measurement, + data_type := DataType, + value := Data + }, + #{ + timestamps := AccTs, + measurements := AccM, + dtypes := AccDt, + values := AccV + } = Acc + ) -> + Timestamps = [Timestamp | AccTs], + case index_of(Measurement, AccM) of + 0 -> + Acc#{ + timestamps => Timestamps, + values => [pad_value(Data, length(AccTs)) | pad_existing_values(AccV)], + measurements => [Measurement | AccM], + dtypes => [DataType | AccDt] + }; + Index -> + Acc#{ + timestamps => Timestamps, + values => insert_value(Index, Data, AccV), + measurements => AccM, + dtypes => AccDt + } + end + end, + InitAcc, + DataList + ). + +pad_value(Data, N) -> + [Data | lists:duplicate(N, null)]. + +pad_existing_values(Values) -> + [[null | Value] || Value <- Values]. + +index_of(E, List) -> + string:str(List, [E]). + +insert_value(_Index, _Data, []) -> + []; +insert_value(1, Data, [Value | Values]) -> + [[Data | Value] | insert_value(0, Data, Values)]; +insert_value(Index, Data, [Value | Values]) -> + [[null | Value] | insert_value(Index - 1, Data, Values)]. + +iotdb_field_key(is_aligned, ?VSN_1_0_X) -> + <<"is_aligned">>; +iotdb_field_key(is_aligned, ?VSN_0_13_X) -> + <<"isAligned">>; +iotdb_field_key(device_id, ?VSN_1_0_X) -> + <<"device">>; +iotdb_field_key(device_id, ?VSN_0_13_X) -> + <<"deviceId">>; +iotdb_field_key(data_types, ?VSN_1_0_X) -> + <<"data_types">>; +iotdb_field_key(data_types, ?VSN_0_13_X) -> + <<"dataTypes">>. + +make_list(List) when is_list(List) -> List; +make_list(Data) -> [Data]. + +device_id(Message, State) -> + case maps:get(device_id, State, undefined) of + undefined -> + case maps:get(payload, Message) of + #{device_id := DeviceId} -> + DeviceId; + _NotFound -> + Topic = maps:get(topic, Message), + case re:replace(Topic, "/", ".", [global, {return, binary}]) of + <<"root.", _/binary>> = Device -> Device; + Device -> <<"root.", Device/binary>> + end + end; + DeviceId -> + DeviceIdTkn = emqx_plugin_libs_rule:preproc_tmpl(DeviceId), + emqx_plugin_libs_rule:proc_tmpl(DeviceIdTkn, Message) + end. + +handle_response({ok, 200, _Headers, Body} = Resp) -> + eval_response_body(Body, Resp); +handle_response({ok, 200, Body} = Resp) -> + eval_response_body(Body, Resp); +handle_response({ok, Code, _Headers, Body}) -> + {error, #{code => Code, body => Body}}; +handle_response({ok, Code, Body}) -> + {error, #{code => Code, body => Body}}; +handle_response({error, _} = Error) -> + Error. + +eval_response_body(Body, Resp) -> + case emqx_utils_json:decode(Body) of + #{<<"code">> := 200} -> Resp; + Reason -> {error, Reason} + end. diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl new file mode 100644 index 000000000..434587cf0 --- /dev/null +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -0,0 +1,229 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_iotdb_impl_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(BRIDGE_TYPE_BIN, <<"iotdb">>). +-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_iotdb]). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, plain} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {plain, AllTCs} + ]. + +init_per_suite(Config) -> + emqx_bridge_testlib:init_per_suite(Config, ?APPS). + +end_per_suite(Config) -> + emqx_bridge_testlib:end_per_suite(Config). + +init_per_group(plain = Type, Config0) -> + Host = os:getenv("IOTDB_PLAIN_HOST", "toxiproxy.emqx.net"), + Port = list_to_integer(os:getenv("IOTDB_PLAIN_PORT", "18080")), + ProxyName = "iotdb", + case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of + true -> + Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), + [ + {bridge_host, Host}, + {bridge_port, Port}, + {proxy_name, ProxyName} + | Config + ]; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_iotdb); + _ -> + {skip, no_iotdb} + end + end; +init_per_group(_Group, Config) -> + Config. + +end_per_group(Group, Config) when + Group =:= plain +-> + emqx_bridge_testlib:end_per_group(Config), + ok; +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(TestCase, Config0) -> + Config = emqx_bridge_testlib:init_per_testcase(TestCase, Config0, fun bridge_config/3), + reset_service(Config), + Config. + +end_per_testcase(TestCase, Config) -> + emqx_bridge_testlib:end_per_testcase(TestCase, Config). + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +bridge_config(TestCase, _TestGroup, Config) -> + UniqueNum = integer_to_binary(erlang:unique_integer()), + Host = ?config(bridge_host, Config), + Port = ?config(bridge_port, Config), + Name = << + (atom_to_binary(TestCase))/binary, UniqueNum/binary + >>, + ServerURL = iolist_to_binary([ + "http://", + Host, + ":", + integer_to_binary(Port) + ]), + ConfigString = + io_lib:format( + "bridges.iotdb.~s {\n" + " enable = true\n" + " base_url = \"~s\"\n" + " authentication = {\n" + " username = \"root\"\n" + " password = \"root\"\n" + " }\n" + " pool_size = 1\n" + " resource_opts = {\n" + " auto_restart_interval = 5000\n" + " request_timeout = 30000\n" + " query_mode = \"async\"\n" + " worker_pool_size = 1\n" + " }\n" + "}\n", + [ + Name, + ServerURL + ] + ), + {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}. + +reset_service(Config) -> + _BridgeConfig = + #{ + <<"base_url">> := BaseURL, + <<"authentication">> := #{ + <<"username">> := Username, + <<"password">> := Password + } + } = + ?config(bridge_config, Config), + ct:pal("bridge config: ~p", [_BridgeConfig]), + Path = <>, + BasicToken = base64:encode(<>), + Headers = [ + {"Content-type", "application/json"}, + {"Authorization", binary_to_list(BasicToken)} + ], + Device = iotdb_device(Config), + Body = #{sql => <<"delete from ", Device/binary, ".*">>}, + {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Headers, Body, #{}). + +make_iotdb_payload(DeviceId) -> + make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"). + +make_iotdb_payload(DeviceId, Measurement, Type, Value) -> + #{ + measurement => Measurement, + data_type => Type, + value => Value, + device_id => DeviceId, + is_aligned => false + }. + +make_message_fun(Topic, Payload) -> + fun() -> + MsgId = erlang:unique_integer([positive]), + #{ + topic => Topic, + id => MsgId, + payload => Payload, + retain => true + } + end. + +iotdb_device(Config) -> + MQTTTopic = ?config(mqtt_topic, Config), + Device = re:replace(MQTTTopic, "/", ".dev", [global, {return, binary}]), + <<"root.", Device/binary>>. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_sync_query_simple(Config) -> + DeviceId = iotdb_device(Config), + Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), + MakeMessageFun = make_message_fun(DeviceId, Payload), + IsSuccessCheck = + fun(Result) -> + ?assertEqual(ok, element(1, Result)) + end, + emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck). + +t_async_query(Config) -> + DeviceId = iotdb_device(Config), + Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), + MakeMessageFun = make_message_fun(DeviceId, Payload), + IsSuccessCheck = + fun(Result) -> + ?assertEqual(ok, element(1, Result)) + end, + emqx_bridge_testlib:t_async_query(Config, MakeMessageFun, IsSuccessCheck). + +t_sync_query_aggregated(Config) -> + DeviceId = iotdb_device(Config), + Payload = [ + make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), + (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "37"))#{timestamp => <<"mow_us">>}, + (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "38"))#{timestamp => <<"mow_ns">>}, + make_iotdb_payload(DeviceId, "charged", <<"BOOLEAN">>, "1"), + make_iotdb_payload(DeviceId, "stoked", <<"BOOLEAN">>, "true"), + make_iotdb_payload(DeviceId, "enriched", <<"BOOLEAN">>, <<"TRUE">>), + make_iotdb_payload(DeviceId, "drained", <<"BOOLEAN">>, "0"), + make_iotdb_payload(DeviceId, "dazzled", <<"BOOLEAN">>, "false"), + make_iotdb_payload(DeviceId, "unplugged", <<"BOOLEAN">>, <<"FALSE">>), + make_iotdb_payload(DeviceId, "weight", <<"FLOAT">>, "87.3"), + make_iotdb_payload(DeviceId, "foo", <<"TEXT">>, <<"bar">>) + ], + MakeMessageFun = make_message_fun(DeviceId, Payload), + IsSuccessCheck = + fun(Result) -> + ?assertEqual(ok, element(1, Result)) + end, + emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck). + +t_sync_query_fail(Config) -> + DeviceId = iotdb_device(Config), + Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "Anton"), + MakeMessageFun = make_message_fun(DeviceId, Payload), + IsSuccessCheck = + fun(Result) -> + ?assertEqual(error, element(1, Result)) + end, + emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck). + +t_create_via_http(Config) -> + emqx_bridge_testlib:t_create_via_http(Config). + +t_start_stop(Config) -> + emqx_bridge_testlib:t_start_stop(Config, iotdb_bridge_stopped). + +t_on_get_status(Config) -> + emqx_bridge_testlib:t_on_get_status(Config). diff --git a/changes/ee/feat-10560.en.md b/changes/ee/feat-10560.en.md new file mode 100644 index 000000000..c5bc59d69 --- /dev/null +++ b/changes/ee/feat-10560.en.md @@ -0,0 +1 @@ +Add enterprise data bridge for Apache IoTDB. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index c3032590e..8581f79b3 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -38,7 +38,8 @@ api_schemas(Method) -> ref(emqx_bridge_sqlserver, Method), ref(emqx_bridge_opents, Method), ref(emqx_bridge_pulsar, Method ++ "_producer"), - ref(emqx_bridge_oracle, Method) + ref(emqx_bridge_oracle, Method), + ref(emqx_bridge_iotdb, Method) ]. schema_modules() -> @@ -61,7 +62,8 @@ schema_modules() -> emqx_bridge_sqlserver, emqx_bridge_opents, emqx_bridge_pulsar, - emqx_bridge_oracle + emqx_bridge_oracle, + emqx_bridge_iotdb ]. examples(Method) -> @@ -103,7 +105,8 @@ resource_type(rocketmq) -> emqx_ee_connector_rocketmq; resource_type(sqlserver) -> emqx_bridge_sqlserver_connector; resource_type(opents) -> emqx_bridge_opents_connector; resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer; -resource_type(oracle) -> emqx_oracle. +resource_type(oracle) -> emqx_oracle; +resource_type(iotdb) -> emqx_bridge_iotdb_impl. fields(bridges) -> [ @@ -178,6 +181,14 @@ fields(bridges) -> desc => <<"Oracle Bridge Config">>, required => false } + )}, + {iotdb, + mk( + hoconsc:map(name, ref(emqx_bridge_iotdb, "config")), + #{ + desc => <<"Apache IoTDB Bridge Config">>, + required => false + } )} ] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ diff --git a/mix.exs b/mix.exs index 469337af8..9383fa085 100644 --- a/mix.exs +++ b/mix.exs @@ -162,6 +162,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_bridge_dynamo, :emqx_bridge_hstreamdb, :emqx_bridge_influxdb, + :emqx_bridge_iotdb, :emqx_bridge_matrix, :emqx_bridge_mongodb, :emqx_bridge_mysql, @@ -372,6 +373,7 @@ defmodule EMQXUmbrella.MixProject do emqx_bridge_dynamo: :permanent, emqx_bridge_hstreamdb: :permanent, emqx_bridge_influxdb: :permanent, + emqx_bridge_iotdb: :permanent, emqx_bridge_matrix: :permanent, emqx_bridge_mongodb: :permanent, emqx_bridge_mysql: :permanent, diff --git a/rebar.config.erl b/rebar.config.erl index 524afe5bf..bb3bbbab6 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -86,6 +86,7 @@ is_community_umbrella_app("apps/emqx_bridge_clickhouse") -> false; is_community_umbrella_app("apps/emqx_bridge_dynamo") -> false; is_community_umbrella_app("apps/emqx_bridge_hstreamdb") -> false; is_community_umbrella_app("apps/emqx_bridge_influxdb") -> false; +is_community_umbrella_app("apps/emqx_bridge_iotdb") -> false; is_community_umbrella_app("apps/emqx_bridge_matrix") -> false; is_community_umbrella_app("apps/emqx_bridge_mongodb") -> false; is_community_umbrella_app("apps/emqx_bridge_mysql") -> false; @@ -463,6 +464,7 @@ relx_apps_per_edition(ee) -> emqx_bridge_dynamo, emqx_bridge_hstreamdb, emqx_bridge_influxdb, + emqx_bridge_iotdb, emqx_bridge_matrix, emqx_bridge_mongodb, emqx_bridge_mysql, diff --git a/rel/i18n/emqx_bridge_iotdb.hocon b/rel/i18n/emqx_bridge_iotdb.hocon new file mode 100644 index 000000000..d1ceecbe0 --- /dev/null +++ b/rel/i18n/emqx_bridge_iotdb.hocon @@ -0,0 +1,77 @@ +emqx_bridge_iotdb { + +config_enable.desc: +"""Enable or disable this bridge""" + +config_enable.label: +"""Enable Or Disable Bridge""" + +config_authentication.desc: +"""Authentication configuration""" + +config_authentication.label: +"""Authentication""" + +auth_basic.desc: +"""Parameters for basic authentication.""" + +auth_basic.label: +"""Basic auth params""" + +config_auth_basic_username.desc: +"""The username as configured at the IoTDB REST interface""" + +config_auth_basic_username.label: + """HTTP Basic Auth Username""" + +config_auth_basic_password.desc: +"""The password as configured at the IoTDB REST interface""" + +config_auth_basic_password.label: +"""HTTP Basic Auth Password""" + +config_base_url.desc: +"""The base URL of the external IoTDB service's REST interface.""" +config_base_url.label: +"""IoTDB REST Service Base URL""" + +config_is_aligned.desc: +"""Whether to align the timeseries""" + +config_is_aligned.label: +"""Align Timeseries""" + +config_device_id.desc: +"""A fixed device name this data should be inserted for. If empty it must either be set in the rule action, the message itself, or it will be extracted from the topic.""" +config_device_id.label: +"""Device ID""" + +config_iotdb_version.desc: +"""The version of the IoTDB system to connect to.""" +config_iotdb_version.label: +"""IoTDB Version""" + +config_max_retries.desc: +"""HTTP request max retry times if failed.""" + +config_max_retries.label: +"""HTTP Request Max Retries""" + +config_request_timeout.desc: +"""HTTP request timeout.""" + +config_request_timeout.label: +"""HTTP Request Timeout""" + +desc_config.desc: +"""Configuration for Apache IoTDB bridge.""" + +desc_config.label: +"""IoTDB Bridge Configuration""" + +desc_name.desc: +"""Bridge name, used as a human-readable description of the bridge.""" + +desc_name.label: +"""Bridge Name""" +} diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 89082a4bd..62f616576 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -197,6 +197,9 @@ for dep in ${CT_DEPS}; do oracle) FILES+=( '.ci/docker-compose-file/docker-compose-oracle.yaml' ) ;; + iotdb) + FILES+=( '.ci/docker-compose-file/docker-compose-iotdb.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1 diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index a9afcf6ca..bdbd77b3b 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -32,6 +32,7 @@ GCM HMAC HOCON HTTPS +IoTDB JSON JWK JWKs @@ -235,6 +236,7 @@ sysmem sysmon tcp ticktime +timeseries tlog tls tlsv