diff --git a/.ci/docker-compose-file/docker-compose-hstreamdb.yaml b/.ci/docker-compose-file/docker-compose-hstreamdb.yaml new file mode 100644 index 000000000..f3c4dbd4c --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-hstreamdb.yaml @@ -0,0 +1,123 @@ +version: "3.5" + +services: + hserver: + image: hstreamdb/hstream:v0.15.0 + container_name: hstreamdb + depends_on: + - zookeeper + - hstore + # ports: + # - "127.0.0.1:6570:6570" + expose: + - 6570 + networks: + - emqx_bridge + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - /tmp:/tmp + - data_store:/data/store + command: + - bash + - "-c" + - | + set -e + /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \ + /usr/local/bin/hstream-server \ + --bind-address 0.0.0.0 --port 6570 \ + --internal-port 6571 \ + --server-id 100 \ + --seed-nodes "$$(hostname -I | awk '{print $$1}'):6571" \ + --advertised-address $$(hostname -I | awk '{print $$1}') \ + --metastore-uri zk://zookeeper:2181 \ + --store-config /data/store/logdevice.conf \ + --store-admin-host hstore --store-admin-port 6440 \ + --store-log-level warning \ + --io-tasks-path /tmp/io/tasks \ + --io-tasks-network emqx_bridge + + hstore: + image: hstreamdb/hstream:v0.15.0 + networks: + - emqx_bridge + volumes: + - data_store:/data/store + command: + - bash + - "-c" + - | + set -ex + # N.B. "enable-dscp-reflection=false" is required for linux kernel which + # doesn't support dscp reflection, e.g. centos7. + /usr/local/bin/ld-dev-cluster --root /data/store \ + --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \ + --user-admin-port 6440 \ + --param enable-dscp-reflection=false \ + --no-interactive + + zookeeper: + image: zookeeper + expose: + - 2181 + networks: + - emqx_bridge + volumes: + - data_zk_data:/data + - data_zk_datalog:/datalog + + ## The three container `hstream-exporter`, `prometheus`, `console` + ## is for HStreamDB Web Console + ## But HStreamDB Console is not supported in v0.15.0 + ## because of HStreamApi proto changed + # hstream-exporter: + # depends_on: + # hserver: + # condition: service_completed_successfully + # image: hstreamdb/hstream-exporter + # networks: + # - hstream-quickstart + # command: + # - bash + # - "-c" + # - | + # set -ex + # hstream-exporter --addr hstream://hserver:6570 + + # prometheus: + # image: prom/prometheus + # expose: + # - 9097 + # networks: + # - hstream-quickstart + # ports: + # - "9097:9090" + # volumes: + # - $PWD/prometheus:/etc/prometheus + + # console: + # image: hstreamdb/hstream-console + # depends_on: + # - hserver + # expose: + # - 5177 + # networks: + # - hstream-quickstart + # environment: + # - SERVER_PORT=5177 + # - PROMETHEUS_URL=http://prometheus:9097 + # - HSTREAM_PUBLIC_ADDRESS=hstream.example.com + # - HSTREAM_PRIVATE_ADDRESS=hserver:6570 + # ports: + # - "5177:5177" + +# networks: +# hstream-quickstart: +# name: hstream-quickstart + +volumes: + data_store: + name: quickstart_data_store + data_zk_data: + name: quickstart_data_zk_data + data_zk_datalog: + name: quickstart_data_zk_datalog diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index f15e779db..c0c88aef0 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -43,10 +43,12 @@ services: - 19000:19000 # S3 TLS - 19100:19100 - # IOTDB + # IOTDB (3 total) - 14242:4242 - 28080:18080 - 38080:38080 + # HStreamDB + - 15670:5670 command: - "-host=0.0.0.0" - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index 87878ac92..d5576108f 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -155,5 +155,11 @@ "listen": "0.0.0.0:8085", "upstream": "gcp_emulator:8085", "enabled": true + }, + { + "name": "hstreamdb", + "listen": "0.0.0.0:6570", + "upstream": "hstreamdb:6570", + "enabled": true } ] diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 07711da12..11d199c9d 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -8,6 +8,7 @@ kernel, stdlib, emqx, + emqx_resource, emqx_connector ]}, {env, []}, diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 57933029d..a71315a27 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -175,14 +175,14 @@ bridge_info_examples(Method) -> value => info_example(mqtt, Method) } }, - ee_bridge_examples(Method) + emqx_enterprise_bridge_examples(Method) ). -if(?EMQX_RELEASE_EDITION == ee). -ee_bridge_examples(Method) -> - emqx_ee_bridge:examples(Method). +emqx_enterprise_bridge_examples(Method) -> + emqx_bridge_enterprise:examples(Method). -else. -ee_bridge_examples(_Method) -> #{}. +emqx_enterprise_bridge_examples(_Method) -> #{}. -endif. info_example(Type, Method) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index 59c94cef7..3bae55090 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -31,7 +31,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_bridge_sup:start_link(), - ok = start_ee_apps(), + ok = ensure_enterprise_schema_loaded(), ok = emqx_bridge:load(), ok = emqx_bridge:load_hook(), ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE), @@ -46,11 +46,11 @@ stop(_State) -> ok. -if(?EMQX_RELEASE_EDITION == ee). -start_ee_apps() -> - {ok, _} = application:ensure_all_started(emqx_ee_bridge), +ensure_enterprise_schema_loaded() -> + _ = emqx_bridge_enterprise:module_info(), ok. -else. -start_ee_apps() -> +ensure_enterprise_schema_loaded() -> ok. -endif. diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index db8669f49..203a65072 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -64,7 +64,7 @@ bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http; bridge_to_resource_type(webhook) -> emqx_connector_http; -bridge_to_resource_type(BridgeType) -> emqx_ee_bridge:resource_type(BridgeType). +bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType). -else. bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl similarity index 97% rename from lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl rename to apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index 66f0dc3b4..e76d1af37 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -1,7 +1,9 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ee_bridge). +-module(emqx_bridge_enterprise). + +-if(?EMQX_RELEASE_EDITION == ee). -include_lib("hocon/include/hoconsc.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -30,7 +32,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_mongodb, <<"mongodb_rs">>, Method ++ "_rs"), api_ref(emqx_bridge_mongodb, <<"mongodb_sharded">>, Method ++ "_sharded"), api_ref(emqx_bridge_mongodb, <<"mongodb_single">>, Method ++ "_single"), - api_ref(emqx_ee_bridge_hstreamdb, <<"hstreamdb">>, Method), + api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method), api_ref(emqx_bridge_influxdb, <<"influxdb_api_v1">>, Method ++ "_api_v1"), api_ref(emqx_bridge_influxdb, <<"influxdb_api_v2">>, Method ++ "_api_v2"), api_ref(emqx_bridge_redis, <<"redis_single">>, Method ++ "_single"), @@ -54,7 +56,7 @@ schema_modules() -> [ emqx_bridge_kafka, emqx_bridge_cassandra, - emqx_ee_bridge_hstreamdb, + emqx_bridge_hstreamdb, emqx_bridge_gcp_pubsub, emqx_bridge_influxdb, emqx_bridge_mongodb, @@ -93,7 +95,7 @@ resource_type(kafka_consumer) -> emqx_bridge_kafka_impl_consumer; %% to hocon; keeping this as just `kafka' for backwards compatibility. resource_type(kafka) -> emqx_bridge_kafka_impl_producer; resource_type(cassandra) -> emqx_bridge_cassandra_connector; -resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; +resource_type(hstreamdb) -> emqx_bridge_hstreamdb_connector; resource_type(gcp_pubsub) -> emqx_bridge_gcp_pubsub_impl_producer; resource_type(gcp_pubsub_consumer) -> emqx_bridge_gcp_pubsub_impl_consumer; resource_type(mongodb_rs) -> emqx_bridge_mongodb_connector; @@ -123,7 +125,7 @@ fields(bridges) -> [ {hstreamdb, mk( - hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")), + hoconsc:map(name, ref(emqx_bridge_hstreamdb, "config")), #{ desc => <<"HStreamDB Bridge Config">>, required => false @@ -365,3 +367,7 @@ rabbitmq_structs() -> api_ref(Module, Type, Method) -> {Type, ref(Module, Method)}. + +-else. + +-endif. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 03ae781ca..58be231e4 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -57,7 +57,7 @@ api_schema(Method) -> {<<"mqtt">>, emqx_bridge_mqtt_schema} ] ], - EE = ee_api_schemas(Method), + EE = enterprise_api_schemas(Method), hoconsc:union(bridge_api_union(Broker ++ EE)). bridge_api_union(Refs) -> @@ -86,36 +86,23 @@ bridge_api_union(Refs) -> end. -if(?EMQX_RELEASE_EDITION == ee). -ee_api_schemas(Method) -> - ensure_loaded(emqx_ee_bridge, emqx_ee_bridge), - case erlang:function_exported(emqx_ee_bridge, api_schemas, 1) of - true -> emqx_ee_bridge:api_schemas(Method); +enterprise_api_schemas(Method) -> + case erlang:function_exported(emqx_bridge_enterprise, api_schemas, 1) of + true -> emqx_bridge_enterprise:api_schemas(Method); false -> [] end. -ee_fields_bridges() -> - ensure_loaded(emqx_ee_bridge, emqx_ee_bridge), - case erlang:function_exported(emqx_ee_bridge, fields, 1) of - true -> emqx_ee_bridge:fields(bridges); +enterprise_fields_bridges() -> + case erlang:function_exported(emqx_bridge_enterprise, fields, 1) of + true -> emqx_bridge_enterprise:fields(bridges); false -> [] end. -%% must ensure the app is loaded before checking if fn is defined. -ensure_loaded(App, Mod) -> - try - _ = application:load(App), - _ = Mod:module_info(), - ok - catch - _:_ -> - ok - end. - -else. -ee_api_schemas(_) -> []. +enterprise_api_schemas(_) -> []. -ee_fields_bridges() -> []. +enterprise_fields_bridges() -> []. -endif. @@ -191,7 +178,7 @@ fields(bridges) -> end } )} - ] ++ ee_fields_bridges(); + ] ++ enterprise_fields_bridges(); fields("metrics") -> [ {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})}, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src index ea3495e0f..f449588cc 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src @@ -1,8 +1,14 @@ {application, emqx_bridge_cassandra, [ {description, "EMQX Enterprise Cassandra Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, - {applications, [kernel, stdlib, ecql]}, + {applications, [ + kernel, + stdlib, + emqx_resource, + emqx_bridge, + ecql + ]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index ad41329d2..2cbf0d6fe 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -396,7 +396,7 @@ conn_opts([Opt | Opts], Acc) -> %% prepare %% XXX: hardcode -%% note: the `cql` param is passed by emqx_ee_bridge_cassa +%% note: the `cql` param is passed by emqx_bridge_cassandra parse_prepare_cql(#{cql := SQL}) -> parse_prepare_cql([{send_message, SQL}], #{}, #{}); parse_prepare_cql(_) -> diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index fb16dd749..9df219296 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -170,9 +170,8 @@ common_init(Config0) -> ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), % Ensure EE bridge module is loaded - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), % Connect to cassnadra directly and create the table catch connect_and_drop_table(Config0), diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl index 452db33a7..bceae1fd2 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl @@ -56,7 +56,6 @@ init_per_suite(Config) -> ok = emqx_common_test_helpers:start_apps([emqx_conf]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]), {ok, _} = application:ensure_all_started(emqx_connector), - {ok, _} = application:ensure_all_started(emqx_ee_connector), %% keyspace `mqtt` must be created in advance {ok, Conn} = ecql:connect([ @@ -79,8 +78,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), - _ = application:stop(emqx_connector), - _ = application:stop(emqx_ee_connector). + _ = application:stop(emqx_connector). init_per_testcase(_, Config) -> Config. diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src index 58a92fde4..cfb08f47b 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src @@ -1,8 +1,14 @@ {application, emqx_bridge_clickhouse, [ {description, "EMQX Enterprise ClickHouse Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, - {applications, [kernel, stdlib, clickhouse, emqx_resource]}, + {applications, [ + kernel, + stdlib, + emqx_resource, + emqx_bridge, + clickhouse + ]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index d0164b57c..98c524913 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -469,7 +469,7 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) -> reason => ClickhouseErrorResult }), case is_recoverable_error(ClickhouseErrorResult) of - %% TODO: The hackeny errors that the clickhouse library forwards are + %% TODO: The hackney errors that the clickhouse library forwards are %% very loosely defined. We should try to make sure that the following %% handles all error cases that we need to handle as recoverable_error true -> diff --git a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_SUITE.erl b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_SUITE.erl index 787fb81ff..b1a560442 100644 --- a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_SUITE.erl +++ b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_SUITE.erl @@ -12,7 +12,7 @@ -include_lib("emqx_connector/include/emqx_connector.hrl"). %% See comment in -%% lib-ee/emqx_ee_connector/test/ee_bridge_clickhouse_connector_SUITE.erl for how to +%% apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl for how to %% run this without bringing up the whole CI infrastucture %%------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index 0e202b714..824f5ee7b 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -1,8 +1,14 @@ {application, emqx_bridge_dynamo, [ {description, "EMQX Enterprise Dynamo Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, - {applications, [kernel, stdlib, erlcloud]}, + {applications, [ + kernel, + stdlib, + emqx_resource, + emqx_bridge, + erlcloud + ]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl index ac2b59229..9490e6455 100644 --- a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl +++ b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl @@ -88,7 +88,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), + ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_conf, erlcloud]), ok. init_per_testcase(TestCase, Config) -> @@ -128,10 +128,12 @@ common_init(ConfigT) -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - % Ensure EE bridge module is loaded - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + % Ensure enterprise bridge module is loaded + ok = emqx_common_test_helpers:start_apps([ + emqx_conf, emqx_resource, emqx_bridge + ]), + _ = application:ensure_all_started(erlcloud), + _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), % setup dynamo setup_dynamo(Config0), diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index 85bbfdd8c..bf5510366 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,10 +1,12 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, stdlib, + emqx_resource, + emqx_bridge, ehttpc ]}, {env, []}, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl index 890a3faed..8ef369068 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl @@ -21,7 +21,7 @@ service_account_json_converter/1 ]). -%% emqx_ee_bridge "unofficial" API +%% emqx_bridge_enterprise "unofficial" API -export([conn_bridge_examples/1]). -type service_account_json() :: map(). diff --git a/lib-ee/emqx_ee_connector/docker-ct b/apps/emqx_bridge_hstreamdb/docker-ct similarity index 50% rename from lib-ee/emqx_ee_connector/docker-ct rename to apps/emqx_bridge_hstreamdb/docker-ct index ef579c036..d25a92b6b 100644 --- a/lib-ee/emqx_ee_connector/docker-ct +++ b/apps/emqx_bridge_hstreamdb/docker-ct @@ -1,2 +1,2 @@ toxiproxy -influxdb +hstreamdb diff --git a/apps/emqx_bridge_hstreamdb/include/emqx_bridge_hstreamdb.hrl b/apps/emqx_bridge_hstreamdb/include/emqx_bridge_hstreamdb.hrl new file mode 100644 index 000000000..6b99c507a --- /dev/null +++ b/apps/emqx_bridge_hstreamdb/include/emqx_bridge_hstreamdb.hrl @@ -0,0 +1,5 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-define(HSTREAMDB_DEFAULT_PORT, 6570). diff --git a/lib-ee/emqx_ee_connector/rebar.config b/apps/emqx_bridge_hstreamdb/rebar.config similarity index 75% rename from lib-ee/emqx_ee_connector/rebar.config rename to apps/emqx_bridge_hstreamdb/rebar.config index ee1d4e500..9a70b55f9 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/apps/emqx_bridge_hstreamdb/rebar.config @@ -1,11 +1,11 @@ %% -*- mode: erlang -*- {erl_opts, [debug_info]}. {deps, [ - {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, + {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.3.1+v0.12.0"}}}, {emqx, {path, "../../apps/emqx"}}, {emqx_utils, {path, "../../apps/emqx_utils"}} ]}. {shell, [ - {apps, [emqx_ee_connector]} + {apps, [emqx_bridge_hstreamdb]} ]}. diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src index 1cb3742b3..2b1e96b00 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src @@ -1,8 +1,13 @@ {application, emqx_bridge_hstreamdb, [ {description, "EMQX Enterprise HStreamDB Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, - {applications, [kernel, stdlib]}, + {applications, [ + kernel, + stdlib, + emqx_resource, + emqx_bridge + ]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl new file mode 100644 index 000000000..7052e0120 --- /dev/null +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl @@ -0,0 +1,109 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_hstreamdb). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"hstreamdb">> => #{ + summary => <<"HStreamDB Bridge">>, + value => values(Method) + } + } + ]. + +values(get) -> + values(post); +values(put) -> + values(post); +values(post) -> + #{ + type => <<"hstreamdb">>, + name => <<"demo">>, + direction => <<"egress">>, + url => <<"http://127.0.0.1:6570">>, + stream => <<"stream">>, + %% raw HRecord + record_template => + <<"{ \"temperature\": ${payload.temperature}, \"humidity\": ${payload.humidity} }">>, + pool_size => 8, + %% grpc_timeout => <<"1m">> + resource_opts => #{ + query_mode => sync, + batch_size => 100, + batch_time => <<"20ms">> + }, + ssl => #{enable => false} + }; +values(_) -> + #{}. + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_hstreamdb". + +roots() -> []. + +fields("config") -> + hstream_bridge_common_fields() ++ + connector_fields(); +fields("post") -> + hstream_bridge_common_fields() ++ + connector_fields() ++ + type_name_fields(); +fields("get") -> + hstream_bridge_common_fields() ++ + connector_fields() ++ + type_name_fields() ++ + emqx_bridge_schema:status_fields(); +fields("put") -> + hstream_bridge_common_fields() ++ + connector_fields(). + +hstream_bridge_common_fields() -> + emqx_bridge_schema:common_bridge_fields() ++ + [ + {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, + {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, + {record_template, + mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})} + ] ++ + emqx_resource_schema:fields("resource_opts"). + +connector_fields() -> + emqx_bridge_hstreamdb_connector:fields(config). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for HStreamDB bridge using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- +%% internal +type_name_fields() -> + [ + {type, mk(enum([hstreamdb]), #{required => true, desc => ?DESC("desc_type")})}, + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} + ]. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl similarity index 57% rename from lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl rename to apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index 70eca83d7..16092f262 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -1,11 +1,12 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ee_connector_hstreamdb). +-module(emqx_bridge_hstreamdb_connector). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -import(hoconsc, [mk/2, enum/1]). @@ -17,6 +18,7 @@ on_start/2, on_stop/2, on_query/3, + on_batch_query/3, on_get_status/2 ]). @@ -28,10 +30,15 @@ namespace/0, roots/0, fields/1, - desc/1, - connector_examples/1 + desc/1 ]). +%% Allocatable resources +-define(hstreamdb_client, hstreamdb_client). + +-define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)). +-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). + %% ------------------------------------------------------------------------------------------------- %% resource callback callback_mode() -> always_sync. @@ -39,25 +46,52 @@ callback_mode() -> always_sync. on_start(InstId, Config) -> start_client(InstId, Config). -on_stop(InstId, #{client := Client, producer := Producer}) -> - StopClientRes = hstreamdb:stop_client(Client), - StopProducerRes = hstreamdb:stop_producer(Producer), - ?SLOG(info, #{ - msg => "stop hstreamdb connector", - connector => InstId, - client => Client, - producer => Producer, - stop_client => StopClientRes, - stop_producer => StopProducerRes - }). +on_stop(InstId, _State) -> + case emqx_resource:get_allocated_resources(InstId) of + #{client := Client, producer := Producer} -> + StopClientRes = hstreamdb:stop_client(Client), + StopProducerRes = hstreamdb:stop_producer(Producer), + ?SLOG(info, #{ + msg => "stop hstreamdb connector", + connector => InstId, + client => Client, + producer => Producer, + stop_client => StopClientRes, + stop_producer => StopProducerRes + }); + _ -> + ok + end. + +-define(FAILED_TO_APPLY_HRECORD_TEMPLATE, + {error, {unrecoverable_error, failed_to_apply_hrecord_template}} +). on_query( _InstId, {send_message, Data}, - #{producer := Producer, ordering_key := OrderingKey, payload := Payload} + _State = #{ + producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate + } ) -> - Record = to_record(OrderingKey, Payload, Data), - do_append(Producer, Record). + try to_record(PartitionKey, HRecordTemplate, Data) of + Record -> append_record(Producer, Record) + catch + _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE + end. + +on_batch_query( + _InstId, + BatchList, + _State = #{ + producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate + } +) -> + try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of + Records -> append_record(Producer, Records) + catch + _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE + end. on_get_status(_InstId, #{client := Client}) -> case is_alive(Client) of @@ -87,43 +121,16 @@ fields(config) -> [ {url, mk(binary(), #{required => true, desc => ?DESC("url")})}, {stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})}, - {ordering_key, mk(binary(), #{required => false, desc => ?DESC("ordering_key")})}, - {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})} - ]; -fields("get") -> - fields("post"); -fields("put") -> - fields(config); -fields("post") -> - [ - {type, mk(hstreamdb, #{required => true, desc => ?DESC("type")})}, - {name, mk(binary(), #{required => true, desc => ?DESC("name")})} - ] ++ fields("put"). + {partition_key, mk(binary(), #{required => false, desc => ?DESC("partition_key")})}, + {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}, + {grpc_timeout, fun grpc_timeout/1} + ] ++ emqx_connector_schema_lib:ssl_fields(). -connector_examples(Method) -> - [ - #{ - <<"hstreamdb">> => #{ - summary => <<"HStreamDB Connector">>, - value => values(Method) - } - } - ]. - -values(post) -> - maps:merge(values(put), #{name => <<"connector">>}); -values(get) -> - values(post); -values(put) -> - #{ - type => hstreamdb, - url => <<"http://127.0.0.1:6570">>, - stream => <<"stream1">>, - ordering_key => <<"some_key">>, - pool_size => 8 - }; -values(_) -> - #{}. +grpc_timeout(type) -> emqx_schema:timeout_duration_ms(); +grpc_timeout(desc) -> ?DESC("grpc_timeout"); +grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW; +grpc_timeout(required) -> false; +grpc_timeout(_) -> undefined. desc(config) -> ?DESC("config"). @@ -168,6 +175,10 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> }), start_producer(InstId, Client, Config); _ -> + ?tp( + hstreamdb_connector_start_failed, + #{error => client_not_alive} + ), ?SLOG(error, #{ msg => "hstreamdb connector: client not alive", connector => InstId @@ -202,7 +213,7 @@ is_alive(Client) -> start_producer( InstId, Client, - Options = #{stream := Stream, pool_size := PoolSize, egress := #{payload := PayloadBin}} + Options = #{stream := Stream, pool_size := PoolSize} ) -> %% TODO: change these batch options after we have better disk cache. BatchSize = maps:get(batch_size, Options, 100), @@ -212,7 +223,8 @@ start_producer( {callback, {?MODULE, on_flush_result, []}}, {max_records, BatchSize}, {interval, Interval}, - {pool_size, PoolSize} + {pool_size, PoolSize}, + {grpc_timeout, maps:get(grpc_timeout, Options, ?DEFAULT_GRPC_TIMEOUT)} ], Name = produce_name(InstId), ?SLOG(info, #{ @@ -224,17 +236,18 @@ start_producer( ?SLOG(info, #{ msg => "hstreamdb connector: producer started" }), - EnableBatch = maps:get(enable_batch, Options, false), - Payload = emqx_placeholder:preproc_tmpl(PayloadBin), - OrderingKeyBin = maps:get(ordering_key, Options, <<"">>), - OrderingKey = emqx_placeholder:preproc_tmpl(OrderingKeyBin), State = #{ client => Client, producer => Producer, - enable_batch => EnableBatch, - ordering_key => OrderingKey, - payload => Payload + enable_batch => maps:get(enable_batch, Options, false), + partition_key => emqx_placeholder:preproc_tmpl( + maps:get(partition_key, Options, <<"">>) + ), + record_template => record_template(Options) }, + ok = emqx_resource:allocate_resource(InstId, ?hstreamdb_client, #{ + client => Client, producer => Producer + }), {ok, State}; {error, {already_started, Pid}} -> ?SLOG(info, #{ @@ -253,47 +266,53 @@ start_producer( {error, Reason} end. -to_record(OrderingKeyTmpl, PayloadTmpl, Data) -> - OrderingKey = emqx_placeholder:proc_tmpl(OrderingKeyTmpl, Data), - Payload = emqx_placeholder:proc_tmpl(PayloadTmpl, Data), - to_record(OrderingKey, Payload). +to_record(PartitionKeyTmpl, HRecordTmpl, Data) -> + PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data), + RawRecord = emqx_placeholder:proc_tmpl(HRecordTmpl, Data), + to_record(PartitionKey, RawRecord). -to_record(OrderingKey, Payload) when is_binary(OrderingKey) -> - to_record(binary_to_list(OrderingKey), Payload); -to_record(OrderingKey, Payload) -> - hstreamdb:to_record(OrderingKey, raw, Payload). +to_record(PartitionKey, RawRecord) when is_binary(PartitionKey) -> + to_record(binary_to_list(PartitionKey), RawRecord); +to_record(PartitionKey, RawRecord) -> + hstreamdb:to_record(PartitionKey, raw, RawRecord). -do_append(Producer, Record) -> - do_append(false, Producer, Record). +to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) -> + Records0 = lists:map( + fun({send_message, Data}) -> + to_record(PartitionKeyTmpl, HRecordTmpl, Data) + end, + BatchList + ), + PartitionKeys = proplists:get_keys(Records0), + [ + {PartitionKey, proplists:get_all_values(PartitionKey, Records0)} + || PartitionKey <- PartitionKeys + ]. -%% TODO: this append is async, remove or change it after we have better disk cache. -% do_append(true, Producer, Record) -> -% case hstreamdb:append(Producer, Record) of -% ok -> -% ?SLOG(debug, #{ -% msg => "hstreamdb producer async append success", -% record => Record -% }); -% {error, Reason} = Err -> -% ?SLOG(error, #{ -% msg => "hstreamdb producer async append failed", -% reason => Reason, -% record => Record -% }), -% Err -% end; -do_append(false, Producer, Record) -> - %% TODO: this append is sync, but it does not support [Record], can only append one Record. - %% Change it after we have better dick cache. +append_record(Producer, MultiPartsRecords) when is_list(MultiPartsRecords) -> + lists:foreach(fun(Record) -> append_record(Producer, Record) end, MultiPartsRecords); +append_record(Producer, Record) when is_tuple(Record) -> + do_append_records(false, Producer, Record). + +%% TODO: only sync request supported. implement async request later. +do_append_records(false, Producer, Record) -> case hstreamdb:append_flush(Producer, Record) of - {ok, _} -> + {ok, _Result} -> + ?tp( + hstreamdb_connector_query_return, + #{result => _Result} + ), ?SLOG(debug, #{ - msg => "hstreamdb producer sync append success", + msg => "HStreamDB producer sync append success", record => Record }); {error, Reason} = Err -> + ?tp( + hstreamdb_connector_query_return, + #{error => Reason} + ), ?SLOG(error, #{ - msg => "hstreamdb producer sync append failed", + msg => "HStreamDB producer sync append failed", reason => Reason, record => Record }), @@ -306,6 +325,11 @@ client_name(InstId) -> produce_name(ActionId) -> list_to_atom("producer:" ++ to_string(ActionId)). +record_template(#{record_template := RawHRecordTemplate}) -> + emqx_placeholder:preproc_tmpl(RawHRecordTemplate); +record_template(_) -> + emqx_placeholder:preproc_tmpl(<<"${payload}">>). + to_string(List) when is_list(List) -> List; to_string(Bin) when is_binary(Bin) -> binary_to_list(Bin); to_string(Atom) when is_atom(Atom) -> atom_to_list(Atom). diff --git a/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl new file mode 100644 index 000000000..430343274 --- /dev/null +++ b/apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl @@ -0,0 +1,578 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_hstreamdb_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("emqx_bridge_hstreamdb.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +% SQL definitions +-define(STREAM, "stream"). +-define(REPLICATION_FACTOR, 1). +%% in seconds +-define(BACKLOG_RETENTION_SECOND, (24 * 60 * 60)). +-define(SHARD_COUNT, 1). + +-define(BRIDGE_NAME, <<"hstreamdb_demo_bridge">>). +-define(RECORD_TEMPLATE, + "{ \"temperature\": ${payload.temperature}, \"humidity\": ${payload.humidity} }" +). + +-define(POOL_SIZE, 8). +-define(BATCH_SIZE, 10). +-define(GRPC_TIMEOUT, "1s"). + +-define(WORKER_POOL_SIZE, 4). + +-define(WITH_CLIENT(Process), + Client = connect_direct_hstream(_Name = test_c, Config), + Process, + ok = disconnect(Client) +). + +%% How to run it locally (all commands are run in $PROJ_ROOT dir): +%% A: run ct on host +%% 1. Start all deps services +%% ```bash +%% sudo docker compose -f .ci/docker-compose-file/docker-compose.yaml \ +%% -f .ci/docker-compose-file/docker-compose-hstreamdb.yaml \ +%% -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ +%% up --build +%% ``` +%% +%% 2. Run use cases with special environment variables +%% 6570 is toxiproxy exported port. +%% Local: +%% ```bash +%% HSTREAMDB_HOST=$REAL_TOXIPROXY_IP HSTREAMDB_PORT=6570 \ +%% PROXY_HOST=$REAL_TOXIPROXY_IP PROXY_PORT=6570 \ +%% ./rebar3 as test ct -c -v --readable true --name ct@127.0.0.1 \ +%% --suite apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl +%% ``` +%% +%% B: run ct in docker container +%% run script: +%% ```bash +%% ./scripts/ct/run.sh --ci --app apps/emqx_bridge_hstreamdb/ -- \ +%% --name 'test@127.0.0.1' -c -v --readable true \ +%% --suite apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl +%% ```` + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, sync} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + NonBatchCases = [t_write_timeout], + BatchingGroups = [{group, with_batch}, {group, without_batch}], + [ + {sync, BatchingGroups}, + {with_batch, TCs -- NonBatchCases}, + {without_batch, TCs} + ]. + +init_per_group(sync, Config) -> + [{query_mode, sync} | Config]; +init_per_group(with_batch, Config0) -> + Config = [{enable_batch, true} | Config0], + common_init(Config); +init_per_group(without_batch, Config0) -> + Config = [{enable_batch, false} | Config0], + common_init(Config); +init_per_group(_Group, Config) -> + Config. + +end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> + connect_and_delete_stream(Config), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok; +end_per_group(_Group, _Config) -> + ok. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_conf, hstreamdb_erl]), + ok. + +init_per_testcase(t_to_hrecord_failed, Config) -> + meck:new([hstreamdb], [passthrough, no_history, no_link]), + meck:expect(hstreamdb, to_record, fun(_, _, _) -> error(trans_to_hrecord_failed) end), + Config; +init_per_testcase(_Testcase, Config) -> + %% drop stream and will create a new one in common_init/1 + %% TODO: create a new stream for each test case + delete_bridge(Config), + snabbkaffe:start_trace(), + Config. + +end_per_testcase(t_to_hrecord_failed, _Config) -> + meck:unload([hstreamdb]); +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok = snabbkaffe:stop(), + delete_bridge(Config), + ok. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_setup_via_config_and_publish(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Data = rand_data(), + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config, Data)), + #{?snk_kind := hstreamdb_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(hstreamdb_connector_query_return, Trace0), + lists:foreach( + fun(EachTrace) -> + ?assertMatch(#{result := #{streamName := <>}}, EachTrace) + end, + Trace + ), + ok + end + ), + ok. + +t_setup_via_http_api_and_publish(Config) -> + BridgeType = ?config(hstreamdb_bridge_type, Config), + Name = ?config(hstreamdb_name, Config), + HStreamDBConfig0 = ?config(hstreamdb_config, Config), + HStreamDBConfig = HStreamDBConfig0#{ + <<"name">> => Name, + <<"type">> => BridgeType + }, + ?assertMatch( + {ok, _}, + create_bridge_http(HStreamDBConfig) + ), + Data = rand_data(), + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config, Data)), + #{?snk_kind := hstreamdb_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace) -> + ?assertMatch( + [#{result := #{streamName := <>}}], + ?of_kind(hstreamdb_connector_query_return, Trace) + ) + end + ), + ok. + +t_get_status(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + + health_check_resource_ok(Config), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + health_check_resource_down(Config) + end), + ok. + +t_create_disconnected(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + + ?check_trace( + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch({ok, _}, create_bridge(Config)) + end), + fun(Trace) -> + ?assertMatch( + [#{error := client_not_alive}], + ?of_kind(hstreamdb_connector_start_failed, Trace) + ), + ok + end + ), + %% TODO: Investigate why reconnection takes at least 5 seconds during ct. + %% While in practical applications, recovers to the 'connected' state + %% within 3 seconds after toxiproxy being enabled.'" + %% timer:sleep(10000), + restart_resource(Config), + health_check_resource_ok(Config), + ok. + +t_write_failure(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + QueryMode = ?config(query_mode, Config), + Data = rand_data(), + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := resource_connected_enter}, + 20_000 + ), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + health_check_resource_down(Config), + case QueryMode of + sync -> + ?assertMatch( + {error, {resource_error, #{msg := "call resource timeout", reason := timeout}}}, + send_message(Config, Data) + ); + async -> + %% TODO: async mode is not supported yet, + %% but it will return ok if calling emqx_resource_buffer_worker:async_query/3, + ?assertMatch( + ok, + send_message(Config, Data) + ) + end + end), + ok. + +t_simple_query(Config) -> + BatchSize = batch_size(Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Requests = gen_batch_req(BatchSize), + ?check_trace( + begin + ?wait_async_action( + lists:foreach( + fun(Request) -> + ?assertEqual(ok, query_resource(Config, Request)) + end, + Requests + ), + #{?snk_kind := hstreamdb_connector_query_return}, + 10_000 + ) + end, + fun(Trace0) -> + Trace = ?of_kind(hstreamdb_connector_query_return, Trace0), + lists:foreach( + fun(EachTrace) -> + ?assertMatch(#{result := #{streamName := <>}}, EachTrace) + end, + Trace + ), + ok + end + ), + ok. + +t_to_hrecord_failed(Config) -> + QueryMode = ?config(query_mode, Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Result = send_message(Config, #{}), + case QueryMode of + sync -> + ?assertMatch( + {error, {unrecoverable_error, failed_to_apply_hrecord_template}}, + Result + ) + %% TODO: async mode is not supported yet + end, + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +common_init(ConfigT) -> + Host = os:getenv("HSTREAMDB_HOST", "toxiproxy"), + RawPort = os:getenv("HSTREAMDB_PORT", str(?HSTREAMDB_DEFAULT_PORT)), + Port = list_to_integer(RawPort), + URL = "http://" ++ Host ++ ":" ++ RawPort, + + Config0 = [ + {hstreamdb_host, Host}, + {hstreamdb_port, Port}, + {hstreamdb_url, URL}, + %% see also for `proxy_name` : $PROJ_ROOT/.ci/docker-compose-file/toxiproxy.json + {proxy_name, "hstreamdb"}, + {batch_size, batch_size(ConfigT)} + | ConfigT + ], + + BridgeType = proplists:get_value(bridge_type, Config0, <<"hstreamdb">>), + case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of + true -> + % Setup toxiproxy + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + % Ensure EE bridge module is loaded + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_resource, emqx_bridge]), + _ = application:ensure_all_started(hstreamdb_erl), + _ = emqx_bridge_enterprise:module_info(), + emqx_mgmt_api_test_util:init_suite(), + % Connect to hstreamdb directly + % drop old stream and then create new one + connect_and_delete_stream(Config0), + connect_and_create_stream(Config0), + {Name, HStreamDBConf} = hstreamdb_config(BridgeType, Config0), + Config = + [ + {hstreamdb_config, HStreamDBConf}, + {hstreamdb_bridge_type, BridgeType}, + {hstreamdb_name, Name}, + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort} + | Config0 + ], + Config; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_hstreamdb); + _ -> + {skip, no_hstreamdb} + end + end. + +hstreamdb_config(BridgeType, Config) -> + Port = integer_to_list(?config(hstreamdb_port, Config)), + URL = "http://" ++ ?config(hstreamdb_host, Config) ++ ":" ++ Port, + Name = ?BRIDGE_NAME, + BatchSize = batch_size(Config), + ConfigString = + io_lib:format( + "bridges.~s.~s {\n" + " enable = true\n" + " url = ~p\n" + " stream = ~p\n" + " record_template = ~p\n" + " pool_size = ~p\n" + " grpc_timeout = ~p\n" + " resource_opts = {\n" + %% always sync + " query_mode = sync\n" + " request_ttl = 500ms\n" + " batch_size = ~b\n" + " worker_pool_size = ~b\n" + " }\n" + "}", + [ + BridgeType, + Name, + URL, + ?STREAM, + ?RECORD_TEMPLATE, + ?POOL_SIZE, + ?GRPC_TIMEOUT, + BatchSize, + ?WORKER_POOL_SIZE + ] + ), + {Name, parse_and_check(ConfigString, BridgeType, Name)}. + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, + Config. + +-define(RPC_OPTIONS, #{pool_size => 4}). + +-define(CONN_ATTEMPTS, 10). + +default_options(Config) -> + [ + {url, ?config(hstreamdb_url, Config)}, + {rpc_options, ?RPC_OPTIONS} + ]. + +connect_direct_hstream(Name, Config) -> + client(Name, Config, ?CONN_ATTEMPTS). + +client(_Name, _Config, N) when N =< 0 -> error(cannot_connect); +client(Name, Config, N) -> + try + _ = hstreamdb:stop_client(Name), + {ok, Client} = hstreamdb:start_client(Name, default_options(Config)), + {ok, echo} = hstreamdb:echo(Client), + Client + catch + Class:Error -> + ct:print("Error connecting: ~p", [{Class, Error}]), + ct:sleep(timer:seconds(1)), + client(Name, Config, N - 1) + end. + +disconnect(Client) -> + hstreamdb:stop_client(Client). + +create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> + BridgeType = ?config(hstreamdb_bridge_type, Config), + Name = ?config(hstreamdb_name, Config), + HSDBConfig0 = ?config(hstreamdb_config, Config), + HSDBConfig = emqx_utils_maps:deep_merge(HSDBConfig0, Overrides), + emqx_bridge:create(BridgeType, Name, HSDBConfig). + +delete_bridge(Config) -> + BridgeType = ?config(hstreamdb_bridge_type, Config), + Name = ?config(hstreamdb_name, Config), + emqx_bridge:remove(BridgeType, Name). + +create_bridge_http(Params) -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; + Error -> Error + end. + +send_message(Config, Data) -> + Name = ?config(hstreamdb_name, Config), + BridgeType = ?config(hstreamdb_bridge_type, Config), + BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), + emqx_bridge:send_message(BridgeID, Data). + +query_resource(Config, Request) -> + Name = ?config(hstreamdb_name, Config), + BridgeType = ?config(hstreamdb_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). + +restart_resource(Config) -> + BridgeName = ?config(hstreamdb_name, Config), + BridgeType = ?config(hstreamdb_bridge_type, Config), + emqx_bridge:disable_enable(disable, BridgeType, BridgeName), + timer:sleep(200), + emqx_bridge:disable_enable(enable, BridgeType, BridgeName). + +resource_id(Config) -> + BridgeName = ?config(hstreamdb_name, Config), + BridgeType = ?config(hstreamdb_bridge_type, Config), + _ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName). + +health_check_resource_ok(Config) -> + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))). + +health_check_resource_down(Config) -> + case emqx_resource_manager:health_check(resource_id(Config)) of + {ok, Status} when Status =:= disconnected orelse Status =:= connecting -> + ok; + {error, timeout} -> + ok; + Other -> + ?assert( + false, lists:flatten(io_lib:format("invalid health check result:~p~n", [Other])) + ) + end. + +% These funs start and then stop the hstreamdb connection +connect_and_create_stream(Config) -> + ?WITH_CLIENT( + _ = hstreamdb:create_stream( + Client, ?STREAM, ?REPLICATION_FACTOR, ?BACKLOG_RETENTION_SECOND, ?SHARD_COUNT + ) + ), + %% force write to stream to make it created and ready to be written data for rest cases + ProducerOptions = [ + {pool_size, 4}, + {stream, ?STREAM}, + {callback, fun(_) -> ok end}, + {max_records, 10}, + {interval, 1000} + ], + ?WITH_CLIENT( + begin + {ok, Producer} = hstreamdb:start_producer(Client, test_producer, ProducerOptions), + _ = hstreamdb:append_flush(Producer, hstreamdb:to_record([], raw, rand_payload())), + _ = hstreamdb:stop_producer(Producer) + end + ). + +connect_and_delete_stream(Config) -> + ?WITH_CLIENT( + _ = hstreamdb:delete_stream(Client, ?STREAM) + ). + +%%-------------------------------------------------------------------- +%% help functions +%%-------------------------------------------------------------------- + +batch_size(Config) -> + case ?config(enable_batch, Config) of + true -> ?BATCH_SIZE; + false -> 1 + end. + +rand_data() -> + #{ + %% Raw MTTT Payload in binary + payload => rand_payload(), + id => <<"0005F8F84FFFAFB9F44200000D810002">>, + topic => <<"test/topic">>, + qos => 0 + }. + +rand_payload() -> + emqx_utils_json:encode(#{ + temperature => rand:uniform(40), humidity => rand:uniform(100) + }). + +gen_batch_req(Count) when + is_integer(Count) andalso Count > 0 +-> + [{send_message, rand_data()} || _Val <- lists:seq(1, Count)]; +gen_batch_req(Count) -> + ct:pal("Gen batch requests failed with unexpected Count: ~p", [Count]). + +str(List) when is_list(List) -> + unicode:characters_to_list(List, utf8); +str(Bin) when is_binary(Bin) -> + unicode:characters_to_list(Bin, utf8); +str(Num) when is_number(Num) -> + number_to_list(Num). + +number_to_list(Int) when is_integer(Int) -> + integer_to_list(Int); +number_to_list(Float) when is_float(Float) -> + float_to_list(Float, [{decimals, 10}, compact]). diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src index 80b708582..71b95a40d 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src @@ -1,8 +1,14 @@ {application, emqx_bridge_influxdb, [ {description, "EMQX Enterprise InfluxDB Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, - {applications, [kernel, stdlib, influxdb]}, + {applications, [ + kernel, + stdlib, + emqx_resource, + emqx_bridge, + influxdb + ]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index a3e4f1eb3..869656dbd 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_impl @@ -10,6 +10,9 @@ {applications, [ kernel, stdlib, + emqx_resource, + emqx_bridge, + %% for module emqx_connector_http emqx_connector ]}, {env, []}, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 724c3f43a..629ac0885 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -18,7 +18,7 @@ desc/1 ]). -%% emqx_ee_bridge "unofficial" API +%% emqx_bridge_enterprise "unofficial" API -export([conn_bridge_examples/1]). %%------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 59c26717e..87c1841e5 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,10 +1,13 @@ +%% -*- mode: erlang -*- {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, stdlib, + emqx_resource, + emqx_bridge, telemetry, wolff, brod, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 4fa188c95..bcdeaf870 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -40,7 +40,7 @@ query_mode(_) -> callback_mode() -> async_if_possible. -%% @doc Config schema is defined in emqx_ee_bridge_kafka. +%% @doc Config schema is defined in emqx_bridge_kafka. on_start(InstId, Config) -> #{ authentication := Auth, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 38d58c1e7..95dec2db0 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -73,11 +73,9 @@ wait_until_kafka_is_up(Attempts) -> end. init_per_suite(Config) -> - %% ensure loaded - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), - application:load(emqx_bridge), - ok = emqx_common_test_helpers:start_apps([emqx_conf]), + %% Ensure enterprise bridge module is loaded + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + _ = emqx_bridge_enterprise:module_info(), ok = emqx_connector_test_helpers:start_apps(?APPS), {ok, _} = application:ensure_all_started(emqx_connector), emqx_mgmt_api_test_util:init_suite(), diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src index 7dfe7eae6..42129bfc7 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src @@ -1,8 +1,13 @@ {application, emqx_bridge_matrix, [ {description, "EMQX Enterprise MatrixDB Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, - {applications, [kernel, stdlib]}, + {applications, [ + kernel, + stdlib, + emqx_resource, + emqx_bridge + ]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src index b10c92aef..fa3ebd3c9 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mongodb, [ {description, "EMQX Enterprise MongoDB Bridge"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [ kernel, @@ -8,7 +8,6 @@ emqx_connector, emqx_resource, emqx_bridge, - emqx_ee_bridge, emqx_mongodb ]}, {env, []}, diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl index 72485815f..b108f654f 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl @@ -10,7 +10,7 @@ -behaviour(hocon_schema). -%% emqx_ee_bridge "callbacks" +%% emqx_bridge_enterprise "callbacks" -export([ conn_bridge_examples/1 ]). diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl index eb0a22e9c..8c004d829 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl @@ -58,7 +58,7 @@ on_query(InstanceId, {send_message, Message0}, State) -> }, Message = render_message(PayloadTemplate, Message0), Res = emqx_mongodb:on_query(InstanceId, {send_message, Message}, NewConnectorState), - ?tp(mongo_ee_connector_on_query_return, #{result => Res}), + ?tp(mongo_bridge_connector_on_query_return, #{result => Res}), Res; on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) -> emqx_mongodb:on_query(InstanceId, Request, ConnectorState). diff --git a/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl b/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl index 89243bf8e..758124713 100644 --- a/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl +++ b/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl @@ -116,7 +116,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf, emqx_rule_engine]), + ok = emqx_common_test_helpers:stop_apps([emqx_mongodb, emqx_bridge, emqx_rule_engine, emqx_conf]), ok. init_per_testcase(_Testcase, Config) -> @@ -146,9 +146,8 @@ start_apps() -> ]). ensure_loaded() -> - _ = application:load(emqx_ee_bridge), _ = application:load(emqtt), - _ = emqx_ee_bridge:module_info(), + _ = emqx_bridge_enterprise:module_info(), ok. mongo_type(Config) -> @@ -354,7 +353,7 @@ t_setup_via_config_and_publish(Config) -> {ok, {ok, _}} = ?wait_async_action( send_message(Config, #{key => Val}), - #{?snk_kind := mongo_ee_connector_on_query_return}, + #{?snk_kind := mongo_bridge_connector_on_query_return}, 5_000 ), ?assertMatch( @@ -379,7 +378,7 @@ t_setup_via_http_api_and_publish(Config) -> {ok, {ok, _}} = ?wait_async_action( send_message(Config, #{key => Val}), - #{?snk_kind := mongo_ee_connector_on_query_return}, + #{?snk_kind := mongo_bridge_connector_on_query_return}, 5_000 ), ?assertMatch( @@ -395,7 +394,7 @@ t_payload_template(Config) -> {ok, {ok, _}} = ?wait_async_action( send_message(Config, #{key => Val, clientid => ClientId}), - #{?snk_kind := mongo_ee_connector_on_query_return}, + #{?snk_kind := mongo_bridge_connector_on_query_return}, 5_000 ), ?assertMatch( @@ -421,7 +420,7 @@ t_collection_template(Config) -> clientid => ClientId, mycollectionvar => <<"mycol">> }), - #{?snk_kind := mongo_ee_connector_on_query_return}, + #{?snk_kind := mongo_bridge_connector_on_query_return}, 5_000 ), ?assertMatch( diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src index 2e6844712..2ecdd6a6a 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src @@ -1,8 +1,15 @@ {application, emqx_bridge_mysql, [ {description, "EMQX Enterprise MySQL Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, - {applications, [kernel, stdlib, emqx_connector, emqx_resource, emqx_bridge, emqx_mysql]}, + {applications, [ + kernel, + stdlib, + emqx_connector, + emqx_resource, + emqx_bridge, + emqx_mysql + ]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl index 2ca0f410d..3ed40e903 100644 --- a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl +++ b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl @@ -142,10 +142,9 @@ common_init(Config0) -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - % Ensure EE bridge module is loaded - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), + % Ensure enterprise bridge module is loaded ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, emqx_rule_engine]), + _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), % Connect to mysql directly and create the table connect_and_create_table(Config0), diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src index 9037b8840..6ec938afd 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src @@ -1,10 +1,12 @@ {application, emqx_bridge_opents, [ {description, "EMQX Enterprise OpenTSDB Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, stdlib, + emqx_resource, + emqx_bridge, opentsdb ]}, {env, []}, diff --git a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl index 93224d5ca..3632ce786 100644 --- a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl +++ b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl @@ -53,7 +53,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), + ok = emqx_common_test_helpers:stop_apps([opentsdb, emqx_bridge, emqx_resource, emqx_conf]), ok. init_per_testcase(_Testcase, Config) -> @@ -91,10 +91,12 @@ common_init(ConfigT) -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - % Ensure EE bridge module is loaded - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + % Ensure enterprise bridge module is loaded + ok = emqx_common_test_helpers:start_apps([ + emqx_conf, emqx_resource, emqx_bridge + ]), + _ = application:ensure_all_started(opentsdb), + _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), {Name, OpenTSConf} = opents_config(BridgeType, Config0), Config = diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src index ad96b4744..a05533da3 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src @@ -1,10 +1,12 @@ {application, emqx_bridge_oracle, [ {description, "EMQX Enterprise Oracle Database Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [ kernel, stdlib, + emqx_resource, + emqx_bridge, emqx_oracle ]}, {env, []}, diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index d7c7cec74..5c6eddb39 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -83,8 +83,9 @@ common_init_per_group() -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - application:load(emqx_bridge), - ok = emqx_common_test_helpers:start_apps([emqx_conf]), + %% Ensure enterprise bridge module is loaded + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + _ = emqx_bridge_enterprise:module_info(), ok = emqx_connector_test_helpers:start_apps(?APPS), {ok, _} = application:ensure_all_started(emqx_connector), emqx_mgmt_api_test_util:init_suite(), diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src index 5a72107a4..ade791a6d 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src @@ -1,8 +1,13 @@ {application, emqx_bridge_pgsql, [ {description, "EMQX Enterprise PostgreSQL Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, - {applications, [kernel, stdlib]}, + {applications, [ + kernel, + stdlib, + emqx_resource, + emqx_bridge + ]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index 6806328d6..d16488bc6 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -145,10 +145,9 @@ common_init(Config0) -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - % Ensure EE bridge module is loaded - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), + % Ensure enterprise bridge module is loaded ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), % Connect to pgsql directly and create the table connect_and_create_table(Config0), diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index 487e862bc..99fb25c33 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,10 +1,12 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [ kernel, stdlib, + emqx_resource, + emqx_bridge, pulsar ]}, {env, []}, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index 038da3e61..2fa5d70cf 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -15,7 +15,7 @@ fields/1, desc/1 ]). -%% emqx_ee_bridge "unofficial" API +%% emqx_bridge_enterprise "unofficial" API -export([conn_bridge_examples/1]). -export([producer_strategy_key_validator/1]). diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 4530748de..15d4b63d4 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -14,7 +14,7 @@ -import(emqx_common_test_helpers, [on_exit/1]). -define(BRIDGE_TYPE_BIN, <<"pulsar_producer">>). --define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_pulsar]). +-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]). -define(RULE_TOPIC, "mqtt/rule"). -define(RULE_TOPIC_BIN, <>). @@ -122,9 +122,11 @@ common_init_per_group() -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - application:load(emqx_bridge), + %% Ensure enterprise bridge module is loaded ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps(?APPS), + ok = emqx_common_test_helpers:start_apps(?APPS), + {ok, _} = application:ensure_all_started(pulsar), + _ = emqx_bridge_enterprise:module_info(), {ok, _} = application:ensure_all_started(emqx_connector), emqx_mgmt_api_test_util:init_suite(), UniqueNum = integer_to_binary(erlang:unique_integer()), @@ -518,7 +520,7 @@ cluster(Config) -> Cluster = emqx_common_test_helpers:emqx_cluster( [core, core], [ - {apps, [emqx_conf, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]}, + {apps, [emqx_conf] ++ ?APPS ++ [pulsar]}, {listener_ports, []}, {peer_mod, PeerModule}, {priv_data_dir, PrivDataDir}, @@ -1097,6 +1099,7 @@ do_t_cluster(Config) -> ), {ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end), {ok, _} = snabbkaffe:receive_events(SRef1), + erpc:multicall(Nodes, fun wait_until_producer_connected/0), {ok, _} = snabbkaffe:block_until( ?match_n_events( NumNodes, @@ -1118,7 +1121,6 @@ do_t_cluster(Config) -> end, Nodes ), - erpc:multicall(Nodes, fun wait_until_producer_connected/0), Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload), ?tp(publishing_message, #{}), erpc:call(N2, emqx, publish, [Message0]), diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src index b8f7b3327..e9ef4d524 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src @@ -1,8 +1,16 @@ {application, emqx_bridge_rabbitmq, [ {description, "EMQX Enterprise RabbitMQ Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, - {applications, [kernel, stdlib, ecql, rabbit_common, amqp_client]}, + {applications, [ + kernel, + stdlib, + emqx_resource, + emqx_bridge, + ecql, + rabbit_common, + amqp_client + ]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl index e6a6c03fb..d3f31f5fa 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl @@ -13,7 +13,7 @@ -include_lib("amqp_client/include/amqp_client.hrl"). %% See comment in -%% lib-ee/emqx_ee_connector/test/ee_connector_rabbitmq_SUITE.erl for how to +%% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to %% run this without bringing up the whole CI infrastucture rabbit_mq_host() -> @@ -50,8 +50,6 @@ init_per_suite(Config) -> ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]), {ok, _} = application:ensure_all_started(emqx_connector), - {ok, _} = application:ensure_all_started(emqx_ee_connector), - {ok, _} = application:ensure_all_started(emqx_ee_bridge), {ok, _} = application:ensure_all_started(amqp_client), emqx_mgmt_api_test_util:init_suite(), ChannelConnection = setup_rabbit_mq_exchange_and_queue(), @@ -112,7 +110,6 @@ end_per_suite(Config) -> ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), _ = application:stop(emqx_connector), - _ = application:stop(emqx_ee_connector), _ = application:stop(emqx_bridge), %% Close the channel ok = amqp_channel:close(Channel), diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl index 6b6ad617f..106a4d67b 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl @@ -48,7 +48,6 @@ init_per_suite(Config) -> ok = emqx_common_test_helpers:start_apps([emqx_conf]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]), {ok, _} = application:ensure_all_started(emqx_connector), - {ok, _} = application:ensure_all_started(emqx_ee_connector), {ok, _} = application:ensure_all_started(amqp_client), ChannelConnection = setup_rabbit_mq_exchange_and_queue(), [{channel_connection, ChannelConnection} | Config]; diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src index 0375b6cd2..bc21adcad 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src @@ -1,8 +1,15 @@ {application, emqx_bridge_redis, [ {description, "EMQX Enterprise Redis Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, - {applications, [kernel, stdlib, emqx_connector, emqx_resource, emqx_bridge, emqx_redis]}, + {applications, [ + kernel, + stdlib, + emqx_connector, + emqx_resource, + emqx_bridge, + emqx_redis + ]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index 046c42180..38a80048e 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -28,7 +28,7 @@ on_start(InstId, #{command_template := CommandTemplate} = Config) -> case emqx_redis:on_start(InstId, Config) of {ok, RedisConnSt} -> ?tp( - redis_ee_connector_start_success, + redis_bridge_connector_start_success, #{} ), {ok, #{ @@ -37,7 +37,7 @@ on_start(InstId, #{command_template := CommandTemplate} = Config) -> }}; {error, _} = Error -> ?tp( - redis_ee_connector_start_error, + redis_bridge_connector_start_error, #{error => Error} ), Error @@ -60,12 +60,12 @@ on_query( ) -> Cmd = proc_command_template(CommandTemplate, Data), ?tp( - redis_ee_connector_cmd, + redis_bridge_connector_cmd, #{cmd => Cmd, batch => false, mode => sync} ), Result = query(InstId, {cmd, Cmd}, RedisConnSt), ?tp( - redis_ee_connector_send_done, + redis_bridge_connector_send_done, #{cmd => Cmd, batch => false, mode => sync, result => Result} ), Result; @@ -75,12 +75,12 @@ on_query( _State = #{conn_st := RedisConnSt} ) -> ?tp( - redis_ee_connector_query, + redis_bridge_connector_query, #{query => Query, batch => false, mode => sync} ), Result = query(InstId, Query, RedisConnSt), ?tp( - redis_ee_connector_send_done, + redis_bridge_connector_send_done, #{query => Query, batch => false, mode => sync, result => Result} ), Result. @@ -90,12 +90,12 @@ on_batch_query( ) -> Cmds = process_batch_data(BatchData, CommandTemplate), ?tp( - redis_ee_connector_send, + redis_bridge_connector_send, #{batch_data => BatchData, batch => true, mode => sync} ), Result = query(InstId, {cmds, Cmds}, RedisConnSt), ?tp( - redis_ee_connector_send_done, + redis_bridge_connector_send_done, #{ batch_data => BatchData, batch_size => length(BatchData), diff --git a/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl b/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl index 242e74b3e..6a0248b67 100644 --- a/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl +++ b/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl @@ -117,11 +117,9 @@ wait_for_ci_redis(Checks, Config) -> ProxyHost = os:getenv("PROXY_HOST", ?PROXY_HOST), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", ?PROXY_PORT)), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps([ - emqx_resource, emqx_bridge, emqx_rule_engine + ok = emqx_common_test_helpers:start_apps([ + emqx_conf, emqx_resource, emqx_connector, emqx_bridge, emqx_rule_engine ]), - {ok, _} = application:ensure_all_started(emqx_connector), [ {proxy_host, ProxyHost}, {proxy_port, ProxyPort} @@ -271,21 +269,21 @@ t_check_replay(Config) -> lists:seq(1, ?BATCH_SIZE) ), #{ - ?snk_kind := redis_ee_connector_send_done, + ?snk_kind := redis_bridge_connector_send_done, batch := true, result := {error, _} }, 10_000 ) end), - #{?snk_kind := redis_ee_connector_send_done, batch := true, result := {ok, _}}, + #{?snk_kind := redis_bridge_connector_send_done, batch := true, result := {ok, _}}, 10_000 ), fun(Trace) -> ?assert( ?strict_causality( - #{?snk_kind := redis_ee_connector_send_done, result := {error, _}}, - #{?snk_kind := redis_ee_connector_send_done, result := {ok, _}}, + #{?snk_kind := redis_bridge_connector_send_done, result := {error, _}}, + #{?snk_kind := redis_bridge_connector_send_done, result := {ok, _}}, Trace ) ) @@ -308,14 +306,14 @@ t_permanent_error(_Config) -> begin ?wait_async_action( publish_message(Topic, Payload), - #{?snk_kind := redis_ee_connector_send_done}, + #{?snk_kind := redis_bridge_connector_send_done}, 10_000 ) end, fun(Trace) -> ?assertMatch( [#{result := {error, _}} | _], - ?of_kind(redis_ee_connector_send_done, Trace) + ?of_kind(redis_bridge_connector_send_done, Trace) ) end ), @@ -334,7 +332,7 @@ t_create_disconnected(Config) -> fun(Trace) -> ?assertMatch( [#{error := _} | _], - ?of_kind(redis_ee_connector_start_error, Trace) + ?of_kind(redis_bridge_connector_start_error, Trace) ), ok end @@ -365,7 +363,7 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) -> end, lists:seq(1, N) ), - #{?snk_kind := redis_ee_connector_send_done, batch := IsBatch}, + #{?snk_kind := redis_bridge_connector_send_done, batch := IsBatch}, 5000 ), fun(Trace) -> @@ -374,13 +372,13 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) -> true -> ?assertMatch( [#{result := {ok, _}, batch := true, batch_size := ?BATCH_SIZE} | _], - ?of_kind(redis_ee_connector_send_done, Trace) + ?of_kind(redis_bridge_connector_send_done, Trace) ), ?assertEqual(?BATCH_SIZE, AddedMsgCount); false -> ?assertMatch( [#{result := {ok, _}, batch := false} | _], - ?of_kind(redis_ee_connector_send_done, Trace) + ?of_kind(redis_bridge_connector_send_done, Trace) ), ?assertEqual(1, AddedMsgCount) end diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src index 7da5430a9..e18b98e3a 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src @@ -1,8 +1,8 @@ {application, emqx_bridge_rocketmq, [ {description, "EMQX Enterprise RocketMQ Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, - {applications, [kernel, stdlib, rocketmq]}, + {applications, [kernel, stdlib, emqx_resource, emqx_bridge, rocketmq]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl index 62e1a7b3f..1a5133b84 100644 --- a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl +++ b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl @@ -109,10 +109,11 @@ common_init(ConfigT) -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - % Ensure EE bridge module is loaded - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + % Ensure enterprise bridge module is loaded + ok = emqx_common_test_helpers:start_apps([ + emqx_conf, emqx_resource, emqx_bridge, rocketmq + ]), + _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), {Name, RocketMQConf} = rocketmq_config(BridgeType, Config0), Config = diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index e5c5ae73d..35f4587b0 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -1,8 +1,8 @@ {application, emqx_bridge_sqlserver, [ {description, "EMQX Enterprise SQL Server Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, - {applications, [kernel, stdlib, odbc]}, + {applications, [kernel, stdlib, emqx_resource, emqx_bridge, odbc]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl index 0e60e9c97..101ead838 100644 --- a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl +++ b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl @@ -416,10 +416,9 @@ common_init(ConfigT) -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - % Ensure EE bridge module is loaded - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + % Ensure enterprise bridge module is loaded + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, odbc]), + _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), % Connect to sqlserver directly % drop old db and table, and then create new ones diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src index 97d8ff2e5..e4c946162 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src @@ -1,8 +1,14 @@ {application, emqx_bridge_tdengine, [ {description, "EMQX Enterprise TDEngine Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, - {applications, [kernel, stdlib, tdengine]}, + {applications, [ + kernel, + stdlib, + emqx_resource, + emqx_bridge, + tdengine + ]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 9399f6029..54744d806 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -142,10 +142,9 @@ common_init(ConfigT) -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - % Ensure EE bridge module is loaded - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + % Ensure enterprise bridge module is loaded + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, tdengine]), + _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), % Connect to tdengine directly and create the table connect_and_create_table(Config0), diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src index f533f3b04..7a4aeeb56 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src @@ -1,8 +1,8 @@ {application, emqx_bridge_timescale, [ {description, "EMQX Enterprise TimescaleDB Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, - {applications, [kernel, stdlib]}, + {applications, [kernel, stdlib, emqx_resource, emqx_bridge]}, {env, []}, {modules, []}, {links, []} diff --git a/changes/ee/feat-10203.en.md b/changes/ee/feat-10203.en.md new file mode 100644 index 000000000..a2ff3b3bb --- /dev/null +++ b/changes/ee/feat-10203.en.md @@ -0,0 +1 @@ +Add HStreamDB bridge support, adapted to the HStreamDB `v0.15.0`. diff --git a/lib-ee/emqx_ee_bridge/.gitignore b/lib-ee/emqx_ee_bridge/.gitignore deleted file mode 100644 index f1c455451..000000000 --- a/lib-ee/emqx_ee_bridge/.gitignore +++ /dev/null @@ -1,19 +0,0 @@ -.rebar3 -_* -.eunit -*.o -*.beam -*.plt -*.swp -*.swo -.erlang.cookie -ebin -log -erl_crash.dump -.rebar -logs -_build -.idea -*.iml -rebar3.crashdump -*~ diff --git a/lib-ee/emqx_ee_bridge/README.md b/lib-ee/emqx_ee_bridge/README.md deleted file mode 100644 index 5cb4d8694..000000000 --- a/lib-ee/emqx_ee_bridge/README.md +++ /dev/null @@ -1,9 +0,0 @@ -emqx_ee_bridge -===== - -An OTP application - -Build ------ - - $ rebar3 compile diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct deleted file mode 100644 index 80f0d394b..000000000 --- a/lib-ee/emqx_ee_bridge/docker-ct +++ /dev/null @@ -1 +0,0 @@ -toxiproxy diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config deleted file mode 100644 index 3b3be6ccf..000000000 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ /dev/null @@ -1,11 +0,0 @@ -%% -*- mode: erlang; -*- -{erl_opts, [debug_info]}. -{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}} - , {emqx_resource, {path, "../../apps/emqx_resource"}} - , {emqx_bridge, {path, "../../apps/emqx_bridge"}} - , {emqx_utils, {path, "../emqx_utils"}} - ]}. - -{shell, [ - {apps, [emqx_ee_bridge]} -]}. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src deleted file mode 100644 index e03cc9423..000000000 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ /dev/null @@ -1,27 +0,0 @@ -{application, emqx_ee_bridge, [ - {description, "EMQX Enterprise data bridges"}, - {vsn, "0.1.16"}, - {registered, []}, - {applications, [ - kernel, - stdlib, - emqx_ee_connector, - telemetry, - emqx_bridge_kafka, - emqx_bridge_gcp_pubsub, - emqx_bridge_cassandra, - emqx_bridge_opents, - emqx_bridge_pulsar, - emqx_bridge_dynamo, - emqx_bridge_sqlserver, - emqx_bridge_rocketmq, - emqx_bridge_rabbitmq, - emqx_bridge_tdengine, - emqx_bridge_influxdb, - emqx_bridge_clickhouse - ]}, - {env, []}, - {modules, []}, - - {links, []} -]}. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl deleted file mode 100644 index 13a70e7c7..000000000 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl +++ /dev/null @@ -1,90 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- --module(emqx_ee_bridge_hstreamdb). - --include_lib("typerefl/include/types.hrl"). --include_lib("hocon/include/hoconsc.hrl"). - --import(hoconsc, [mk/2, enum/1, ref/2]). - --export([ - conn_bridge_examples/1 -]). - --export([ - namespace/0, - roots/0, - fields/1, - desc/1 -]). - -%% ------------------------------------------------------------------------------------------------- -%% api - -conn_bridge_examples(Method) -> - [ - #{ - <<"hstreamdb">> => #{ - summary => <<"HStreamDB Bridge">>, - value => values(Method) - } - } - ]. - -values(_Method) -> - #{ - type => hstreamdb, - name => <<"demo">>, - connector => <<"hstreamdb:connector">>, - enable => true, - direction => egress, - local_topic => <<"local/topic/#">>, - payload => <<"${payload}">> - }. - -%% ------------------------------------------------------------------------------------------------- -%% Hocon Schema Definitions -namespace() -> "bridge_hstreamdb". - -roots() -> []. - -fields("config") -> - [ - {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, - {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, - {payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})}, - {connector, field(connector)} - ]; -fields("post") -> - [type_field(), name_field() | fields("config")]; -fields("put") -> - fields("config"); -fields("get") -> - emqx_bridge_schema:status_fields() ++ fields("post"). - -field(connector) -> - mk( - hoconsc:union([binary(), ref(emqx_ee_connector_hstreamdb, config)]), - #{ - required => true, - example => <<"hstreamdb:demo">>, - desc => ?DESC("desc_connector") - } - ). - -desc("config") -> - ?DESC("desc_config"); -desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> - ["Configuration for HStream using `", string:to_upper(Method), "` method."]; -desc(_) -> - undefined. - -%% ------------------------------------------------------------------------------------------------- -%% internal -type_field() -> - {type, mk(enum([hstreamdb]), #{required => true, desc => ?DESC("desc_type")})}. - -name_field() -> - {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_bridge/test/ee_bridge_hstreamdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/ee_bridge_hstreamdb_SUITE.erl deleted file mode 100644 index 867b09f32..000000000 --- a/lib-ee/emqx_ee_bridge/test/ee_bridge_hstreamdb_SUITE.erl +++ /dev/null @@ -1,16 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(ee_bridge_hstreamdb_SUITE). - --compile(nowarn_export_all). --compile(export_all). - --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). - -all() -> - emqx_common_test_helpers:all(?MODULE). - -%% TODO: diff --git a/lib-ee/emqx_ee_connector/.gitignore b/lib-ee/emqx_ee_connector/.gitignore deleted file mode 100644 index f1c455451..000000000 --- a/lib-ee/emqx_ee_connector/.gitignore +++ /dev/null @@ -1,19 +0,0 @@ -.rebar3 -_* -.eunit -*.o -*.beam -*.plt -*.swp -*.swo -.erlang.cookie -ebin -log -erl_crash.dump -.rebar -logs -_build -.idea -*.iml -rebar3.crashdump -*~ diff --git a/lib-ee/emqx_ee_connector/README.md b/lib-ee/emqx_ee_connector/README.md deleted file mode 100644 index e665af458..000000000 --- a/lib-ee/emqx_ee_connector/README.md +++ /dev/null @@ -1,9 +0,0 @@ -emqx_ee_connector -===== - -An OTP application - -Build ------ - - $ rebar3 compile diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src deleted file mode 100644 index c3187f807..000000000 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ /dev/null @@ -1,16 +0,0 @@ -{application, emqx_ee_connector, [ - {description, "EMQX Enterprise connectors"}, - {vsn, "0.1.15"}, - {registered, []}, - {applications, [ - kernel, - stdlib, - ecpool, - hstreamdb_erl, - emqx_redis - ]}, - {env, []}, - {modules, []}, - - {links, []} -]}. diff --git a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_hstreamdb_SUITE.erl b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_hstreamdb_SUITE.erl deleted file mode 100644 index ad49d9f62..000000000 --- a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_hstreamdb_SUITE.erl +++ /dev/null @@ -1,16 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_ee_connector_hstreamdb_SUITE). - --compile(nowarn_export_all). --compile(export_all). - --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). - -all() -> - emqx_common_test_helpers:all(?MODULE). - -%% TODO: diff --git a/mix.exs b/mix.exs index 1cdea809f..9e9279895 100644 --- a/mix.exs +++ b/mix.exs @@ -195,7 +195,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ - {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, + {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.3.1+v0.12.0"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.7.6"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, @@ -395,8 +395,6 @@ defmodule EMQXUmbrella.MixProject do do: [ emqx_license: :permanent, emqx_enterprise: :load, - emqx_ee_connector: :permanent, - emqx_ee_bridge: :permanent, emqx_bridge_kafka: :permanent, emqx_bridge_pulsar: :permanent, emqx_bridge_gcp_pubsub: :permanent, diff --git a/rebar.config.erl b/rebar.config.erl index 312fc1173..5f86afaa2 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -463,8 +463,6 @@ relx_apps_per_edition(ee) -> [ emqx_license, {emqx_enterprise, load}, - emqx_ee_connector, - emqx_ee_bridge, emqx_bridge_kafka, emqx_bridge_pulsar, emqx_bridge_gcp_pubsub, diff --git a/rel/i18n/emqx_ee_bridge_hstreamdb.hocon b/rel/i18n/emqx_bridge_hstreamdb.hocon similarity index 79% rename from rel/i18n/emqx_ee_bridge_hstreamdb.hocon rename to rel/i18n/emqx_bridge_hstreamdb.hocon index cb43d483a..10700d4eb 100644 --- a/rel/i18n/emqx_ee_bridge_hstreamdb.hocon +++ b/rel/i18n/emqx_bridge_hstreamdb.hocon @@ -1,4 +1,4 @@ -emqx_ee_bridge_hstreamdb { +emqx_bridge_hstreamdb { config_direction.desc: """The direction of this bridge, MUST be 'egress'""" @@ -6,12 +6,6 @@ config_direction.desc: config_direction.label: """Bridge Direction""" -config_enable.desc: -"""Enable or disable this bridge""" - -config_enable.label: -"""Enable Or Disable Bridge""" - desc_config.desc: """Configuration for an HStreamDB bridge.""" @@ -46,10 +40,10 @@ will be forwarded.""" local_topic.label: """Local Topic""" -payload.desc: -"""The payload to be forwarded to the HStreamDB. Placeholders supported.""" +record_template.desc: +"""The HStream Record template to be forwarded to the HStreamDB. Placeholders supported.""" -payload.label: -"""Payload""" +record_template.label: +"""HStream Record""" } diff --git a/rel/i18n/emqx_ee_connector_hstreamdb.hocon b/rel/i18n/emqx_bridge_hstreamdb_connector.hocon similarity index 77% rename from rel/i18n/emqx_ee_connector_hstreamdb.hocon rename to rel/i18n/emqx_bridge_hstreamdb_connector.hocon index f6838297f..c0faa794c 100644 --- a/rel/i18n/emqx_ee_connector_hstreamdb.hocon +++ b/rel/i18n/emqx_bridge_hstreamdb_connector.hocon @@ -1,4 +1,4 @@ -emqx_ee_connector_hstreamdb { +emqx_bridge_hstreamdb_connector { config.desc: """HStreamDB connection config""" @@ -6,16 +6,34 @@ config.desc: config.label: """Connection config""" +type.desc: +"""The Connector Type.""" + +type.label: +"""Connector Type""" + name.desc: """Connector name, used as a human-readable description of the connector.""" name.label: """Connector Name""" -ordering_key.desc: +url.desc: +"""HStreamDB Server URL""" + +url.label: +"""HStreamDB Server URL""" + +stream_name.desc: +"""HStreamDB Stream Name""" + +stream_name.label: +"""HStreamDB Stream Name""" + +partition_key.desc: """HStreamDB Ordering Key""" -ordering_key.label: +partition_key.label: """HStreamDB Ordering Key""" pool_size.desc: @@ -24,22 +42,10 @@ pool_size.desc: pool_size.label: """HStreamDB Pool Size""" -stream_name.desc: -"""HStreamDB Stream Name""" +grpc_timeout.desc: +"""HStreamDB gRPC Timeout""" -stream_name.label: -"""HStreamDB Stream Name""" - -type.desc: -"""The Connector Type.""" - -type.label: -"""Connector Type""" - -url.desc: -"""HStreamDB Server URL""" - -url.label: -"""HStreamDB Server URL""" +grpc_timeout.label: +"""HStreamDB gRPC Timeout""" } diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index ea49f6e21..e4061f7cb 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -216,6 +216,9 @@ for dep in ${CT_DEPS}; do gcp_emulator) FILES+=( '.ci/docker-compose-file/docker-compose-gcp-emulator.yaml' ) ;; + hstreamdb) + FILES+=( '.ci/docker-compose-file/docker-compose-hstreamdb.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1 diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 03587aa54..953b0b762 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -270,6 +270,10 @@ hstream hstreamDB hstream hstreamdb +hrecord +hRecord +Hrecord +HRecord SASL GSSAPI keytab