Merge pull request #11203 from JimMoen/feat-hstream-bridge
feat: hstreamdb bridge with batch query
This commit is contained in:
commit
a8d96ea36a
|
@ -0,0 +1,123 @@
|
|||
version: "3.5"
|
||||
|
||||
services:
|
||||
hserver:
|
||||
image: hstreamdb/hstream:v0.15.0
|
||||
container_name: hstreamdb
|
||||
depends_on:
|
||||
- zookeeper
|
||||
- hstore
|
||||
# ports:
|
||||
# - "127.0.0.1:6570:6570"
|
||||
expose:
|
||||
- 6570
|
||||
networks:
|
||||
- emqx_bridge
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
- /tmp:/tmp
|
||||
- data_store:/data/store
|
||||
command:
|
||||
- bash
|
||||
- "-c"
|
||||
- |
|
||||
set -e
|
||||
/usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \
|
||||
/usr/local/bin/hstream-server \
|
||||
--bind-address 0.0.0.0 --port 6570 \
|
||||
--internal-port 6571 \
|
||||
--server-id 100 \
|
||||
--seed-nodes "$$(hostname -I | awk '{print $$1}'):6571" \
|
||||
--advertised-address $$(hostname -I | awk '{print $$1}') \
|
||||
--metastore-uri zk://zookeeper:2181 \
|
||||
--store-config /data/store/logdevice.conf \
|
||||
--store-admin-host hstore --store-admin-port 6440 \
|
||||
--store-log-level warning \
|
||||
--io-tasks-path /tmp/io/tasks \
|
||||
--io-tasks-network emqx_bridge
|
||||
|
||||
hstore:
|
||||
image: hstreamdb/hstream:v0.15.0
|
||||
networks:
|
||||
- emqx_bridge
|
||||
volumes:
|
||||
- data_store:/data/store
|
||||
command:
|
||||
- bash
|
||||
- "-c"
|
||||
- |
|
||||
set -ex
|
||||
# N.B. "enable-dscp-reflection=false" is required for linux kernel which
|
||||
# doesn't support dscp reflection, e.g. centos7.
|
||||
/usr/local/bin/ld-dev-cluster --root /data/store \
|
||||
--use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \
|
||||
--user-admin-port 6440 \
|
||||
--param enable-dscp-reflection=false \
|
||||
--no-interactive
|
||||
|
||||
zookeeper:
|
||||
image: zookeeper
|
||||
expose:
|
||||
- 2181
|
||||
networks:
|
||||
- emqx_bridge
|
||||
volumes:
|
||||
- data_zk_data:/data
|
||||
- data_zk_datalog:/datalog
|
||||
|
||||
## The three container `hstream-exporter`, `prometheus`, `console`
|
||||
## is for HStreamDB Web Console
|
||||
## But HStreamDB Console is not supported in v0.15.0
|
||||
## because of HStreamApi proto changed
|
||||
# hstream-exporter:
|
||||
# depends_on:
|
||||
# hserver:
|
||||
# condition: service_completed_successfully
|
||||
# image: hstreamdb/hstream-exporter
|
||||
# networks:
|
||||
# - hstream-quickstart
|
||||
# command:
|
||||
# - bash
|
||||
# - "-c"
|
||||
# - |
|
||||
# set -ex
|
||||
# hstream-exporter --addr hstream://hserver:6570
|
||||
|
||||
# prometheus:
|
||||
# image: prom/prometheus
|
||||
# expose:
|
||||
# - 9097
|
||||
# networks:
|
||||
# - hstream-quickstart
|
||||
# ports:
|
||||
# - "9097:9090"
|
||||
# volumes:
|
||||
# - $PWD/prometheus:/etc/prometheus
|
||||
|
||||
# console:
|
||||
# image: hstreamdb/hstream-console
|
||||
# depends_on:
|
||||
# - hserver
|
||||
# expose:
|
||||
# - 5177
|
||||
# networks:
|
||||
# - hstream-quickstart
|
||||
# environment:
|
||||
# - SERVER_PORT=5177
|
||||
# - PROMETHEUS_URL=http://prometheus:9097
|
||||
# - HSTREAM_PUBLIC_ADDRESS=hstream.example.com
|
||||
# - HSTREAM_PRIVATE_ADDRESS=hserver:6570
|
||||
# ports:
|
||||
# - "5177:5177"
|
||||
|
||||
# networks:
|
||||
# hstream-quickstart:
|
||||
# name: hstream-quickstart
|
||||
|
||||
volumes:
|
||||
data_store:
|
||||
name: quickstart_data_store
|
||||
data_zk_data:
|
||||
name: quickstart_data_zk_data
|
||||
data_zk_datalog:
|
||||
name: quickstart_data_zk_datalog
|
|
@ -43,10 +43,12 @@ services:
|
|||
- 19000:19000
|
||||
# S3 TLS
|
||||
- 19100:19100
|
||||
# IOTDB
|
||||
# IOTDB (3 total)
|
||||
- 14242:4242
|
||||
- 28080:18080
|
||||
- 38080:38080
|
||||
# HStreamDB
|
||||
- 15670:5670
|
||||
command:
|
||||
- "-host=0.0.0.0"
|
||||
- "-config=/config/toxiproxy.json"
|
||||
|
|
|
@ -155,5 +155,11 @@
|
|||
"listen": "0.0.0.0:8085",
|
||||
"upstream": "gcp_emulator:8085",
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "hstreamdb",
|
||||
"listen": "0.0.0.0:6570",
|
||||
"upstream": "hstreamdb:6570",
|
||||
"enabled": true
|
||||
}
|
||||
]
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
kernel,
|
||||
stdlib,
|
||||
emqx,
|
||||
emqx_resource,
|
||||
emqx_connector
|
||||
]},
|
||||
{env, []},
|
||||
|
|
|
@ -175,14 +175,14 @@ bridge_info_examples(Method) ->
|
|||
value => info_example(mqtt, Method)
|
||||
}
|
||||
},
|
||||
ee_bridge_examples(Method)
|
||||
emqx_enterprise_bridge_examples(Method)
|
||||
).
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
ee_bridge_examples(Method) ->
|
||||
emqx_ee_bridge:examples(Method).
|
||||
emqx_enterprise_bridge_examples(Method) ->
|
||||
emqx_bridge_enterprise:examples(Method).
|
||||
-else.
|
||||
ee_bridge_examples(_Method) -> #{}.
|
||||
emqx_enterprise_bridge_examples(_Method) -> #{}.
|
||||
-endif.
|
||||
|
||||
info_example(Type, Method) ->
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
|
||||
start(_StartType, _StartArgs) ->
|
||||
{ok, Sup} = emqx_bridge_sup:start_link(),
|
||||
ok = start_ee_apps(),
|
||||
ok = ensure_enterprise_schema_loaded(),
|
||||
ok = emqx_bridge:load(),
|
||||
ok = emqx_bridge:load_hook(),
|
||||
ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE),
|
||||
|
@ -46,11 +46,11 @@ stop(_State) ->
|
|||
ok.
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
start_ee_apps() ->
|
||||
{ok, _} = application:ensure_all_started(emqx_ee_bridge),
|
||||
ensure_enterprise_schema_loaded() ->
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
ok.
|
||||
-else.
|
||||
start_ee_apps() ->
|
||||
ensure_enterprise_schema_loaded() ->
|
||||
ok.
|
||||
-endif.
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
|
|||
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
|
||||
bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http;
|
||||
bridge_to_resource_type(webhook) -> emqx_connector_http;
|
||||
bridge_to_resource_type(BridgeType) -> emqx_ee_bridge:resource_type(BridgeType).
|
||||
bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType).
|
||||
-else.
|
||||
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
|
||||
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_ee_bridge).
|
||||
-module(emqx_bridge_enterprise).
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
@ -30,7 +32,7 @@ api_schemas(Method) ->
|
|||
api_ref(emqx_bridge_mongodb, <<"mongodb_rs">>, Method ++ "_rs"),
|
||||
api_ref(emqx_bridge_mongodb, <<"mongodb_sharded">>, Method ++ "_sharded"),
|
||||
api_ref(emqx_bridge_mongodb, <<"mongodb_single">>, Method ++ "_single"),
|
||||
api_ref(emqx_ee_bridge_hstreamdb, <<"hstreamdb">>, Method),
|
||||
api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method),
|
||||
api_ref(emqx_bridge_influxdb, <<"influxdb_api_v1">>, Method ++ "_api_v1"),
|
||||
api_ref(emqx_bridge_influxdb, <<"influxdb_api_v2">>, Method ++ "_api_v2"),
|
||||
api_ref(emqx_bridge_redis, <<"redis_single">>, Method ++ "_single"),
|
||||
|
@ -54,7 +56,7 @@ schema_modules() ->
|
|||
[
|
||||
emqx_bridge_kafka,
|
||||
emqx_bridge_cassandra,
|
||||
emqx_ee_bridge_hstreamdb,
|
||||
emqx_bridge_hstreamdb,
|
||||
emqx_bridge_gcp_pubsub,
|
||||
emqx_bridge_influxdb,
|
||||
emqx_bridge_mongodb,
|
||||
|
@ -93,7 +95,7 @@ resource_type(kafka_consumer) -> emqx_bridge_kafka_impl_consumer;
|
|||
%% to hocon; keeping this as just `kafka' for backwards compatibility.
|
||||
resource_type(kafka) -> emqx_bridge_kafka_impl_producer;
|
||||
resource_type(cassandra) -> emqx_bridge_cassandra_connector;
|
||||
resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
|
||||
resource_type(hstreamdb) -> emqx_bridge_hstreamdb_connector;
|
||||
resource_type(gcp_pubsub) -> emqx_bridge_gcp_pubsub_impl_producer;
|
||||
resource_type(gcp_pubsub_consumer) -> emqx_bridge_gcp_pubsub_impl_consumer;
|
||||
resource_type(mongodb_rs) -> emqx_bridge_mongodb_connector;
|
||||
|
@ -123,7 +125,7 @@ fields(bridges) ->
|
|||
[
|
||||
{hstreamdb,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")),
|
||||
hoconsc:map(name, ref(emqx_bridge_hstreamdb, "config")),
|
||||
#{
|
||||
desc => <<"HStreamDB Bridge Config">>,
|
||||
required => false
|
||||
|
@ -365,3 +367,7 @@ rabbitmq_structs() ->
|
|||
|
||||
api_ref(Module, Type, Method) ->
|
||||
{Type, ref(Module, Method)}.
|
||||
|
||||
-else.
|
||||
|
||||
-endif.
|
|
@ -57,7 +57,7 @@ api_schema(Method) ->
|
|||
{<<"mqtt">>, emqx_bridge_mqtt_schema}
|
||||
]
|
||||
],
|
||||
EE = ee_api_schemas(Method),
|
||||
EE = enterprise_api_schemas(Method),
|
||||
hoconsc:union(bridge_api_union(Broker ++ EE)).
|
||||
|
||||
bridge_api_union(Refs) ->
|
||||
|
@ -86,36 +86,23 @@ bridge_api_union(Refs) ->
|
|||
end.
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
ee_api_schemas(Method) ->
|
||||
ensure_loaded(emqx_ee_bridge, emqx_ee_bridge),
|
||||
case erlang:function_exported(emqx_ee_bridge, api_schemas, 1) of
|
||||
true -> emqx_ee_bridge:api_schemas(Method);
|
||||
enterprise_api_schemas(Method) ->
|
||||
case erlang:function_exported(emqx_bridge_enterprise, api_schemas, 1) of
|
||||
true -> emqx_bridge_enterprise:api_schemas(Method);
|
||||
false -> []
|
||||
end.
|
||||
|
||||
ee_fields_bridges() ->
|
||||
ensure_loaded(emqx_ee_bridge, emqx_ee_bridge),
|
||||
case erlang:function_exported(emqx_ee_bridge, fields, 1) of
|
||||
true -> emqx_ee_bridge:fields(bridges);
|
||||
enterprise_fields_bridges() ->
|
||||
case erlang:function_exported(emqx_bridge_enterprise, fields, 1) of
|
||||
true -> emqx_bridge_enterprise:fields(bridges);
|
||||
false -> []
|
||||
end.
|
||||
|
||||
%% must ensure the app is loaded before checking if fn is defined.
|
||||
ensure_loaded(App, Mod) ->
|
||||
try
|
||||
_ = application:load(App),
|
||||
_ = Mod:module_info(),
|
||||
ok
|
||||
catch
|
||||
_:_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
-else.
|
||||
|
||||
ee_api_schemas(_) -> [].
|
||||
enterprise_api_schemas(_) -> [].
|
||||
|
||||
ee_fields_bridges() -> [].
|
||||
enterprise_fields_bridges() -> [].
|
||||
|
||||
-endif.
|
||||
|
||||
|
@ -191,7 +178,7 @@ fields(bridges) ->
|
|||
end
|
||||
}
|
||||
)}
|
||||
] ++ ee_fields_bridges();
|
||||
] ++ enterprise_fields_bridges();
|
||||
fields("metrics") ->
|
||||
[
|
||||
{"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})},
|
||||
|
|
|
@ -1,8 +1,14 @@
|
|||
{application, emqx_bridge_cassandra, [
|
||||
{description, "EMQX Enterprise Cassandra Bridge"},
|
||||
{vsn, "0.1.2"},
|
||||
{vsn, "0.1.3"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, ecql]},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
ecql
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -396,7 +396,7 @@ conn_opts([Opt | Opts], Acc) ->
|
|||
%% prepare
|
||||
|
||||
%% XXX: hardcode
|
||||
%% note: the `cql` param is passed by emqx_ee_bridge_cassa
|
||||
%% note: the `cql` param is passed by emqx_bridge_cassandra
|
||||
parse_prepare_cql(#{cql := SQL}) ->
|
||||
parse_prepare_cql([{send_message, SQL}], #{}, #{});
|
||||
parse_prepare_cql(_) ->
|
||||
|
|
|
@ -170,9 +170,8 @@ common_init(Config0) ->
|
|||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
% Ensure EE bridge module is loaded
|
||||
_ = application:load(emqx_ee_bridge),
|
||||
_ = emqx_ee_bridge:module_info(),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
% Connect to cassnadra directly and create the table
|
||||
catch connect_and_drop_table(Config0),
|
||||
|
|
|
@ -56,7 +56,6 @@ init_per_suite(Config) ->
|
|||
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
{ok, _} = application:ensure_all_started(emqx_ee_connector),
|
||||
%% keyspace `mqtt` must be created in advance
|
||||
{ok, Conn} =
|
||||
ecql:connect([
|
||||
|
@ -79,8 +78,7 @@ init_per_suite(Config) ->
|
|||
end_per_suite(_Config) ->
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
||||
_ = application:stop(emqx_connector),
|
||||
_ = application:stop(emqx_ee_connector).
|
||||
_ = application:stop(emqx_connector).
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
Config.
|
||||
|
|
|
@ -1,8 +1,14 @@
|
|||
{application, emqx_bridge_clickhouse, [
|
||||
{description, "EMQX Enterprise ClickHouse Bridge"},
|
||||
{vsn, "0.2.1"},
|
||||
{vsn, "0.2.2"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, clickhouse, emqx_resource]},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
clickhouse
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -469,7 +469,7 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
|
|||
reason => ClickhouseErrorResult
|
||||
}),
|
||||
case is_recoverable_error(ClickhouseErrorResult) of
|
||||
%% TODO: The hackeny errors that the clickhouse library forwards are
|
||||
%% TODO: The hackney errors that the clickhouse library forwards are
|
||||
%% very loosely defined. We should try to make sure that the following
|
||||
%% handles all error cases that we need to handle as recoverable_error
|
||||
true ->
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||
|
||||
%% See comment in
|
||||
%% lib-ee/emqx_ee_connector/test/ee_bridge_clickhouse_connector_SUITE.erl for how to
|
||||
%% apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl for how to
|
||||
%% run this without bringing up the whole CI infrastucture
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
|
|
@ -1,8 +1,14 @@
|
|||
{application, emqx_bridge_dynamo, [
|
||||
{description, "EMQX Enterprise Dynamo Bridge"},
|
||||
{vsn, "0.1.2"},
|
||||
{vsn, "0.1.3"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, erlcloud]},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
erlcloud
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -88,7 +88,7 @@ init_per_suite(Config) ->
|
|||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_mgmt_api_test_util:end_suite(),
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_conf, erlcloud]),
|
||||
ok.
|
||||
|
||||
init_per_testcase(TestCase, Config) ->
|
||||
|
@ -128,10 +128,12 @@ common_init(ConfigT) ->
|
|||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
% Ensure EE bridge module is loaded
|
||||
_ = application:load(emqx_ee_bridge),
|
||||
_ = emqx_ee_bridge:module_info(),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
% Ensure enterprise bridge module is loaded
|
||||
ok = emqx_common_test_helpers:start_apps([
|
||||
emqx_conf, emqx_resource, emqx_bridge
|
||||
]),
|
||||
_ = application:ensure_all_started(erlcloud),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
% setup dynamo
|
||||
setup_dynamo(Config0),
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
{application, emqx_bridge_gcp_pubsub, [
|
||||
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
|
||||
{vsn, "0.1.3"},
|
||||
{vsn, "0.1.4"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
ehttpc
|
||||
]},
|
||||
{env, []},
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
service_account_json_converter/1
|
||||
]).
|
||||
|
||||
%% emqx_ee_bridge "unofficial" API
|
||||
%% emqx_bridge_enterprise "unofficial" API
|
||||
-export([conn_bridge_examples/1]).
|
||||
|
||||
-type service_account_json() :: map().
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
toxiproxy
|
||||
influxdb
|
||||
hstreamdb
|
|
@ -0,0 +1,5 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-define(HSTREAMDB_DEFAULT_PORT, 6570).
|
|
@ -1,11 +1,11 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{erl_opts, [debug_info]}.
|
||||
{deps, [
|
||||
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
|
||||
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.3.1+v0.12.0"}}},
|
||||
{emqx, {path, "../../apps/emqx"}},
|
||||
{emqx_utils, {path, "../../apps/emqx_utils"}}
|
||||
]}.
|
||||
|
||||
{shell, [
|
||||
{apps, [emqx_ee_connector]}
|
||||
{apps, [emqx_bridge_hstreamdb]}
|
||||
]}.
|
|
@ -1,8 +1,13 @@
|
|||
{application, emqx_bridge_hstreamdb, [
|
||||
{description, "EMQX Enterprise HStreamDB Bridge"},
|
||||
{vsn, "0.1.0"},
|
||||
{vsn, "0.1.1"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib]},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_hstreamdb).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
-export([
|
||||
conn_bridge_examples/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
namespace/0,
|
||||
roots/0,
|
||||
fields/1,
|
||||
desc/1
|
||||
]).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% api
|
||||
|
||||
conn_bridge_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"hstreamdb">> => #{
|
||||
summary => <<"HStreamDB Bridge">>,
|
||||
value => values(Method)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
values(get) ->
|
||||
values(post);
|
||||
values(put) ->
|
||||
values(post);
|
||||
values(post) ->
|
||||
#{
|
||||
type => <<"hstreamdb">>,
|
||||
name => <<"demo">>,
|
||||
direction => <<"egress">>,
|
||||
url => <<"http://127.0.0.1:6570">>,
|
||||
stream => <<"stream">>,
|
||||
%% raw HRecord
|
||||
record_template =>
|
||||
<<"{ \"temperature\": ${payload.temperature}, \"humidity\": ${payload.humidity} }">>,
|
||||
pool_size => 8,
|
||||
%% grpc_timeout => <<"1m">>
|
||||
resource_opts => #{
|
||||
query_mode => sync,
|
||||
batch_size => 100,
|
||||
batch_time => <<"20ms">>
|
||||
},
|
||||
ssl => #{enable => false}
|
||||
};
|
||||
values(_) ->
|
||||
#{}.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Hocon Schema Definitions
|
||||
namespace() -> "bridge_hstreamdb".
|
||||
|
||||
roots() -> [].
|
||||
|
||||
fields("config") ->
|
||||
hstream_bridge_common_fields() ++
|
||||
connector_fields();
|
||||
fields("post") ->
|
||||
hstream_bridge_common_fields() ++
|
||||
connector_fields() ++
|
||||
type_name_fields();
|
||||
fields("get") ->
|
||||
hstream_bridge_common_fields() ++
|
||||
connector_fields() ++
|
||||
type_name_fields() ++
|
||||
emqx_bridge_schema:status_fields();
|
||||
fields("put") ->
|
||||
hstream_bridge_common_fields() ++
|
||||
connector_fields().
|
||||
|
||||
hstream_bridge_common_fields() ->
|
||||
emqx_bridge_schema:common_bridge_fields() ++
|
||||
[
|
||||
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
|
||||
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
|
||||
{record_template,
|
||||
mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})}
|
||||
] ++
|
||||
emqx_resource_schema:fields("resource_opts").
|
||||
|
||||
connector_fields() ->
|
||||
emqx_bridge_hstreamdb_connector:fields(config).
|
||||
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for HStreamDB bridge using `", string:to_upper(Method), "` method."];
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% internal
|
||||
type_name_fields() ->
|
||||
[
|
||||
{type, mk(enum([hstreamdb]), #{required => true, desc => ?DESC("desc_type")})},
|
||||
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
|
||||
].
|
|
@ -1,11 +1,12 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_ee_connector_hstreamdb).
|
||||
-module(emqx_bridge_hstreamdb_connector).
|
||||
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-import(hoconsc, [mk/2, enum/1]).
|
||||
|
||||
|
@ -17,6 +18,7 @@
|
|||
on_start/2,
|
||||
on_stop/2,
|
||||
on_query/3,
|
||||
on_batch_query/3,
|
||||
on_get_status/2
|
||||
]).
|
||||
|
||||
|
@ -28,10 +30,15 @@
|
|||
namespace/0,
|
||||
roots/0,
|
||||
fields/1,
|
||||
desc/1,
|
||||
connector_examples/1
|
||||
desc/1
|
||||
]).
|
||||
|
||||
%% Allocatable resources
|
||||
-define(hstreamdb_client, hstreamdb_client).
|
||||
|
||||
-define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)).
|
||||
-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% resource callback
|
||||
callback_mode() -> always_sync.
|
||||
|
@ -39,25 +46,52 @@ callback_mode() -> always_sync.
|
|||
on_start(InstId, Config) ->
|
||||
start_client(InstId, Config).
|
||||
|
||||
on_stop(InstId, #{client := Client, producer := Producer}) ->
|
||||
StopClientRes = hstreamdb:stop_client(Client),
|
||||
StopProducerRes = hstreamdb:stop_producer(Producer),
|
||||
?SLOG(info, #{
|
||||
msg => "stop hstreamdb connector",
|
||||
connector => InstId,
|
||||
client => Client,
|
||||
producer => Producer,
|
||||
stop_client => StopClientRes,
|
||||
stop_producer => StopProducerRes
|
||||
}).
|
||||
on_stop(InstId, _State) ->
|
||||
case emqx_resource:get_allocated_resources(InstId) of
|
||||
#{client := Client, producer := Producer} ->
|
||||
StopClientRes = hstreamdb:stop_client(Client),
|
||||
StopProducerRes = hstreamdb:stop_producer(Producer),
|
||||
?SLOG(info, #{
|
||||
msg => "stop hstreamdb connector",
|
||||
connector => InstId,
|
||||
client => Client,
|
||||
producer => Producer,
|
||||
stop_client => StopClientRes,
|
||||
stop_producer => StopProducerRes
|
||||
});
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
-define(FAILED_TO_APPLY_HRECORD_TEMPLATE,
|
||||
{error, {unrecoverable_error, failed_to_apply_hrecord_template}}
|
||||
).
|
||||
|
||||
on_query(
|
||||
_InstId,
|
||||
{send_message, Data},
|
||||
#{producer := Producer, ordering_key := OrderingKey, payload := Payload}
|
||||
_State = #{
|
||||
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
|
||||
}
|
||||
) ->
|
||||
Record = to_record(OrderingKey, Payload, Data),
|
||||
do_append(Producer, Record).
|
||||
try to_record(PartitionKey, HRecordTemplate, Data) of
|
||||
Record -> append_record(Producer, Record)
|
||||
catch
|
||||
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
|
||||
end.
|
||||
|
||||
on_batch_query(
|
||||
_InstId,
|
||||
BatchList,
|
||||
_State = #{
|
||||
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
|
||||
}
|
||||
) ->
|
||||
try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of
|
||||
Records -> append_record(Producer, Records)
|
||||
catch
|
||||
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
|
||||
end.
|
||||
|
||||
on_get_status(_InstId, #{client := Client}) ->
|
||||
case is_alive(Client) of
|
||||
|
@ -87,43 +121,16 @@ fields(config) ->
|
|||
[
|
||||
{url, mk(binary(), #{required => true, desc => ?DESC("url")})},
|
||||
{stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})},
|
||||
{ordering_key, mk(binary(), #{required => false, desc => ?DESC("ordering_key")})},
|
||||
{pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}
|
||||
];
|
||||
fields("get") ->
|
||||
fields("post");
|
||||
fields("put") ->
|
||||
fields(config);
|
||||
fields("post") ->
|
||||
[
|
||||
{type, mk(hstreamdb, #{required => true, desc => ?DESC("type")})},
|
||||
{name, mk(binary(), #{required => true, desc => ?DESC("name")})}
|
||||
] ++ fields("put").
|
||||
{partition_key, mk(binary(), #{required => false, desc => ?DESC("partition_key")})},
|
||||
{pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})},
|
||||
{grpc_timeout, fun grpc_timeout/1}
|
||||
] ++ emqx_connector_schema_lib:ssl_fields().
|
||||
|
||||
connector_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"hstreamdb">> => #{
|
||||
summary => <<"HStreamDB Connector">>,
|
||||
value => values(Method)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
values(post) ->
|
||||
maps:merge(values(put), #{name => <<"connector">>});
|
||||
values(get) ->
|
||||
values(post);
|
||||
values(put) ->
|
||||
#{
|
||||
type => hstreamdb,
|
||||
url => <<"http://127.0.0.1:6570">>,
|
||||
stream => <<"stream1">>,
|
||||
ordering_key => <<"some_key">>,
|
||||
pool_size => 8
|
||||
};
|
||||
values(_) ->
|
||||
#{}.
|
||||
grpc_timeout(type) -> emqx_schema:timeout_duration_ms();
|
||||
grpc_timeout(desc) -> ?DESC("grpc_timeout");
|
||||
grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW;
|
||||
grpc_timeout(required) -> false;
|
||||
grpc_timeout(_) -> undefined.
|
||||
|
||||
desc(config) ->
|
||||
?DESC("config").
|
||||
|
@ -168,6 +175,10 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
|
|||
}),
|
||||
start_producer(InstId, Client, Config);
|
||||
_ ->
|
||||
?tp(
|
||||
hstreamdb_connector_start_failed,
|
||||
#{error => client_not_alive}
|
||||
),
|
||||
?SLOG(error, #{
|
||||
msg => "hstreamdb connector: client not alive",
|
||||
connector => InstId
|
||||
|
@ -202,7 +213,7 @@ is_alive(Client) ->
|
|||
start_producer(
|
||||
InstId,
|
||||
Client,
|
||||
Options = #{stream := Stream, pool_size := PoolSize, egress := #{payload := PayloadBin}}
|
||||
Options = #{stream := Stream, pool_size := PoolSize}
|
||||
) ->
|
||||
%% TODO: change these batch options after we have better disk cache.
|
||||
BatchSize = maps:get(batch_size, Options, 100),
|
||||
|
@ -212,7 +223,8 @@ start_producer(
|
|||
{callback, {?MODULE, on_flush_result, []}},
|
||||
{max_records, BatchSize},
|
||||
{interval, Interval},
|
||||
{pool_size, PoolSize}
|
||||
{pool_size, PoolSize},
|
||||
{grpc_timeout, maps:get(grpc_timeout, Options, ?DEFAULT_GRPC_TIMEOUT)}
|
||||
],
|
||||
Name = produce_name(InstId),
|
||||
?SLOG(info, #{
|
||||
|
@ -224,17 +236,18 @@ start_producer(
|
|||
?SLOG(info, #{
|
||||
msg => "hstreamdb connector: producer started"
|
||||
}),
|
||||
EnableBatch = maps:get(enable_batch, Options, false),
|
||||
Payload = emqx_placeholder:preproc_tmpl(PayloadBin),
|
||||
OrderingKeyBin = maps:get(ordering_key, Options, <<"">>),
|
||||
OrderingKey = emqx_placeholder:preproc_tmpl(OrderingKeyBin),
|
||||
State = #{
|
||||
client => Client,
|
||||
producer => Producer,
|
||||
enable_batch => EnableBatch,
|
||||
ordering_key => OrderingKey,
|
||||
payload => Payload
|
||||
enable_batch => maps:get(enable_batch, Options, false),
|
||||
partition_key => emqx_placeholder:preproc_tmpl(
|
||||
maps:get(partition_key, Options, <<"">>)
|
||||
),
|
||||
record_template => record_template(Options)
|
||||
},
|
||||
ok = emqx_resource:allocate_resource(InstId, ?hstreamdb_client, #{
|
||||
client => Client, producer => Producer
|
||||
}),
|
||||
{ok, State};
|
||||
{error, {already_started, Pid}} ->
|
||||
?SLOG(info, #{
|
||||
|
@ -253,47 +266,53 @@ start_producer(
|
|||
{error, Reason}
|
||||
end.
|
||||
|
||||
to_record(OrderingKeyTmpl, PayloadTmpl, Data) ->
|
||||
OrderingKey = emqx_placeholder:proc_tmpl(OrderingKeyTmpl, Data),
|
||||
Payload = emqx_placeholder:proc_tmpl(PayloadTmpl, Data),
|
||||
to_record(OrderingKey, Payload).
|
||||
to_record(PartitionKeyTmpl, HRecordTmpl, Data) ->
|
||||
PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data),
|
||||
RawRecord = emqx_placeholder:proc_tmpl(HRecordTmpl, Data),
|
||||
to_record(PartitionKey, RawRecord).
|
||||
|
||||
to_record(OrderingKey, Payload) when is_binary(OrderingKey) ->
|
||||
to_record(binary_to_list(OrderingKey), Payload);
|
||||
to_record(OrderingKey, Payload) ->
|
||||
hstreamdb:to_record(OrderingKey, raw, Payload).
|
||||
to_record(PartitionKey, RawRecord) when is_binary(PartitionKey) ->
|
||||
to_record(binary_to_list(PartitionKey), RawRecord);
|
||||
to_record(PartitionKey, RawRecord) ->
|
||||
hstreamdb:to_record(PartitionKey, raw, RawRecord).
|
||||
|
||||
do_append(Producer, Record) ->
|
||||
do_append(false, Producer, Record).
|
||||
to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) ->
|
||||
Records0 = lists:map(
|
||||
fun({send_message, Data}) ->
|
||||
to_record(PartitionKeyTmpl, HRecordTmpl, Data)
|
||||
end,
|
||||
BatchList
|
||||
),
|
||||
PartitionKeys = proplists:get_keys(Records0),
|
||||
[
|
||||
{PartitionKey, proplists:get_all_values(PartitionKey, Records0)}
|
||||
|| PartitionKey <- PartitionKeys
|
||||
].
|
||||
|
||||
%% TODO: this append is async, remove or change it after we have better disk cache.
|
||||
% do_append(true, Producer, Record) ->
|
||||
% case hstreamdb:append(Producer, Record) of
|
||||
% ok ->
|
||||
% ?SLOG(debug, #{
|
||||
% msg => "hstreamdb producer async append success",
|
||||
% record => Record
|
||||
% });
|
||||
% {error, Reason} = Err ->
|
||||
% ?SLOG(error, #{
|
||||
% msg => "hstreamdb producer async append failed",
|
||||
% reason => Reason,
|
||||
% record => Record
|
||||
% }),
|
||||
% Err
|
||||
% end;
|
||||
do_append(false, Producer, Record) ->
|
||||
%% TODO: this append is sync, but it does not support [Record], can only append one Record.
|
||||
%% Change it after we have better dick cache.
|
||||
append_record(Producer, MultiPartsRecords) when is_list(MultiPartsRecords) ->
|
||||
lists:foreach(fun(Record) -> append_record(Producer, Record) end, MultiPartsRecords);
|
||||
append_record(Producer, Record) when is_tuple(Record) ->
|
||||
do_append_records(false, Producer, Record).
|
||||
|
||||
%% TODO: only sync request supported. implement async request later.
|
||||
do_append_records(false, Producer, Record) ->
|
||||
case hstreamdb:append_flush(Producer, Record) of
|
||||
{ok, _} ->
|
||||
{ok, _Result} ->
|
||||
?tp(
|
||||
hstreamdb_connector_query_return,
|
||||
#{result => _Result}
|
||||
),
|
||||
?SLOG(debug, #{
|
||||
msg => "hstreamdb producer sync append success",
|
||||
msg => "HStreamDB producer sync append success",
|
||||
record => Record
|
||||
});
|
||||
{error, Reason} = Err ->
|
||||
?tp(
|
||||
hstreamdb_connector_query_return,
|
||||
#{error => Reason}
|
||||
),
|
||||
?SLOG(error, #{
|
||||
msg => "hstreamdb producer sync append failed",
|
||||
msg => "HStreamDB producer sync append failed",
|
||||
reason => Reason,
|
||||
record => Record
|
||||
}),
|
||||
|
@ -306,6 +325,11 @@ client_name(InstId) ->
|
|||
produce_name(ActionId) ->
|
||||
list_to_atom("producer:" ++ to_string(ActionId)).
|
||||
|
||||
record_template(#{record_template := RawHRecordTemplate}) ->
|
||||
emqx_placeholder:preproc_tmpl(RawHRecordTemplate);
|
||||
record_template(_) ->
|
||||
emqx_placeholder:preproc_tmpl(<<"${payload}">>).
|
||||
|
||||
to_string(List) when is_list(List) -> List;
|
||||
to_string(Bin) when is_binary(Bin) -> binary_to_list(Bin);
|
||||
to_string(Atom) when is_atom(Atom) -> atom_to_list(Atom).
|
|
@ -0,0 +1,578 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_hstreamdb_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("emqx_bridge_hstreamdb.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
% SQL definitions
|
||||
-define(STREAM, "stream").
|
||||
-define(REPLICATION_FACTOR, 1).
|
||||
%% in seconds
|
||||
-define(BACKLOG_RETENTION_SECOND, (24 * 60 * 60)).
|
||||
-define(SHARD_COUNT, 1).
|
||||
|
||||
-define(BRIDGE_NAME, <<"hstreamdb_demo_bridge">>).
|
||||
-define(RECORD_TEMPLATE,
|
||||
"{ \"temperature\": ${payload.temperature}, \"humidity\": ${payload.humidity} }"
|
||||
).
|
||||
|
||||
-define(POOL_SIZE, 8).
|
||||
-define(BATCH_SIZE, 10).
|
||||
-define(GRPC_TIMEOUT, "1s").
|
||||
|
||||
-define(WORKER_POOL_SIZE, 4).
|
||||
|
||||
-define(WITH_CLIENT(Process),
|
||||
Client = connect_direct_hstream(_Name = test_c, Config),
|
||||
Process,
|
||||
ok = disconnect(Client)
|
||||
).
|
||||
|
||||
%% How to run it locally (all commands are run in $PROJ_ROOT dir):
|
||||
%% A: run ct on host
|
||||
%% 1. Start all deps services
|
||||
%% ```bash
|
||||
%% sudo docker compose -f .ci/docker-compose-file/docker-compose.yaml \
|
||||
%% -f .ci/docker-compose-file/docker-compose-hstreamdb.yaml \
|
||||
%% -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
|
||||
%% up --build
|
||||
%% ```
|
||||
%%
|
||||
%% 2. Run use cases with special environment variables
|
||||
%% 6570 is toxiproxy exported port.
|
||||
%% Local:
|
||||
%% ```bash
|
||||
%% HSTREAMDB_HOST=$REAL_TOXIPROXY_IP HSTREAMDB_PORT=6570 \
|
||||
%% PROXY_HOST=$REAL_TOXIPROXY_IP PROXY_PORT=6570 \
|
||||
%% ./rebar3 as test ct -c -v --readable true --name ct@127.0.0.1 \
|
||||
%% --suite apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl
|
||||
%% ```
|
||||
%%
|
||||
%% B: run ct in docker container
|
||||
%% run script:
|
||||
%% ```bash
|
||||
%% ./scripts/ct/run.sh --ci --app apps/emqx_bridge_hstreamdb/ -- \
|
||||
%% --name 'test@127.0.0.1' -c -v --readable true \
|
||||
%% --suite apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl
|
||||
%% ````
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% CT boilerplate
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, sync}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||
NonBatchCases = [t_write_timeout],
|
||||
BatchingGroups = [{group, with_batch}, {group, without_batch}],
|
||||
[
|
||||
{sync, BatchingGroups},
|
||||
{with_batch, TCs -- NonBatchCases},
|
||||
{without_batch, TCs}
|
||||
].
|
||||
|
||||
init_per_group(sync, Config) ->
|
||||
[{query_mode, sync} | Config];
|
||||
init_per_group(with_batch, Config0) ->
|
||||
Config = [{enable_batch, true} | Config0],
|
||||
common_init(Config);
|
||||
init_per_group(without_batch, Config0) ->
|
||||
Config = [{enable_batch, false} | Config0],
|
||||
common_init(Config);
|
||||
init_per_group(_Group, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
|
||||
connect_and_delete_stream(Config),
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
ok;
|
||||
end_per_group(_Group, _Config) ->
|
||||
ok.
|
||||
|
||||
init_per_suite(Config) ->
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_mgmt_api_test_util:end_suite(),
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_conf, hstreamdb_erl]),
|
||||
ok.
|
||||
|
||||
init_per_testcase(t_to_hrecord_failed, Config) ->
|
||||
meck:new([hstreamdb], [passthrough, no_history, no_link]),
|
||||
meck:expect(hstreamdb, to_record, fun(_, _, _) -> error(trans_to_hrecord_failed) end),
|
||||
Config;
|
||||
init_per_testcase(_Testcase, Config) ->
|
||||
%% drop stream and will create a new one in common_init/1
|
||||
%% TODO: create a new stream for each test case
|
||||
delete_bridge(Config),
|
||||
snabbkaffe:start_trace(),
|
||||
Config.
|
||||
|
||||
end_per_testcase(t_to_hrecord_failed, _Config) ->
|
||||
meck:unload([hstreamdb]);
|
||||
end_per_testcase(_Testcase, Config) ->
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
ok = snabbkaffe:stop(),
|
||||
delete_bridge(Config),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
t_setup_via_config_and_publish(Config) ->
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
create_bridge(Config)
|
||||
),
|
||||
Data = rand_data(),
|
||||
?check_trace(
|
||||
begin
|
||||
?wait_async_action(
|
||||
?assertEqual(ok, send_message(Config, Data)),
|
||||
#{?snk_kind := hstreamdb_connector_query_return},
|
||||
10_000
|
||||
),
|
||||
ok
|
||||
end,
|
||||
fun(Trace0) ->
|
||||
Trace = ?of_kind(hstreamdb_connector_query_return, Trace0),
|
||||
lists:foreach(
|
||||
fun(EachTrace) ->
|
||||
?assertMatch(#{result := #{streamName := <<?STREAM>>}}, EachTrace)
|
||||
end,
|
||||
Trace
|
||||
),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_setup_via_http_api_and_publish(Config) ->
|
||||
BridgeType = ?config(hstreamdb_bridge_type, Config),
|
||||
Name = ?config(hstreamdb_name, Config),
|
||||
HStreamDBConfig0 = ?config(hstreamdb_config, Config),
|
||||
HStreamDBConfig = HStreamDBConfig0#{
|
||||
<<"name">> => Name,
|
||||
<<"type">> => BridgeType
|
||||
},
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
create_bridge_http(HStreamDBConfig)
|
||||
),
|
||||
Data = rand_data(),
|
||||
?check_trace(
|
||||
begin
|
||||
?wait_async_action(
|
||||
?assertEqual(ok, send_message(Config, Data)),
|
||||
#{?snk_kind := hstreamdb_connector_query_return},
|
||||
10_000
|
||||
),
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertMatch(
|
||||
[#{result := #{streamName := <<?STREAM>>}}],
|
||||
?of_kind(hstreamdb_connector_query_return, Trace)
|
||||
)
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_get_status(Config) ->
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
create_bridge(Config)
|
||||
),
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyName = ?config(proxy_name, Config),
|
||||
|
||||
health_check_resource_ok(Config),
|
||||
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||
health_check_resource_down(Config)
|
||||
end),
|
||||
ok.
|
||||
|
||||
t_create_disconnected(Config) ->
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyName = ?config(proxy_name, Config),
|
||||
|
||||
?check_trace(
|
||||
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||
?assertMatch({ok, _}, create_bridge(Config))
|
||||
end),
|
||||
fun(Trace) ->
|
||||
?assertMatch(
|
||||
[#{error := client_not_alive}],
|
||||
?of_kind(hstreamdb_connector_start_failed, Trace)
|
||||
),
|
||||
ok
|
||||
end
|
||||
),
|
||||
%% TODO: Investigate why reconnection takes at least 5 seconds during ct.
|
||||
%% While in practical applications, recovers to the 'connected' state
|
||||
%% within 3 seconds after toxiproxy being enabled.'"
|
||||
%% timer:sleep(10000),
|
||||
restart_resource(Config),
|
||||
health_check_resource_ok(Config),
|
||||
ok.
|
||||
|
||||
t_write_failure(Config) ->
|
||||
ProxyName = ?config(proxy_name, Config),
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
QueryMode = ?config(query_mode, Config),
|
||||
Data = rand_data(),
|
||||
{{ok, _}, {ok, _}} =
|
||||
?wait_async_action(
|
||||
create_bridge(Config),
|
||||
#{?snk_kind := resource_connected_enter},
|
||||
20_000
|
||||
),
|
||||
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||
health_check_resource_down(Config),
|
||||
case QueryMode of
|
||||
sync ->
|
||||
?assertMatch(
|
||||
{error, {resource_error, #{msg := "call resource timeout", reason := timeout}}},
|
||||
send_message(Config, Data)
|
||||
);
|
||||
async ->
|
||||
%% TODO: async mode is not supported yet,
|
||||
%% but it will return ok if calling emqx_resource_buffer_worker:async_query/3,
|
||||
?assertMatch(
|
||||
ok,
|
||||
send_message(Config, Data)
|
||||
)
|
||||
end
|
||||
end),
|
||||
ok.
|
||||
|
||||
t_simple_query(Config) ->
|
||||
BatchSize = batch_size(Config),
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
create_bridge(Config)
|
||||
),
|
||||
Requests = gen_batch_req(BatchSize),
|
||||
?check_trace(
|
||||
begin
|
||||
?wait_async_action(
|
||||
lists:foreach(
|
||||
fun(Request) ->
|
||||
?assertEqual(ok, query_resource(Config, Request))
|
||||
end,
|
||||
Requests
|
||||
),
|
||||
#{?snk_kind := hstreamdb_connector_query_return},
|
||||
10_000
|
||||
)
|
||||
end,
|
||||
fun(Trace0) ->
|
||||
Trace = ?of_kind(hstreamdb_connector_query_return, Trace0),
|
||||
lists:foreach(
|
||||
fun(EachTrace) ->
|
||||
?assertMatch(#{result := #{streamName := <<?STREAM>>}}, EachTrace)
|
||||
end,
|
||||
Trace
|
||||
),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_to_hrecord_failed(Config) ->
|
||||
QueryMode = ?config(query_mode, Config),
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
create_bridge(Config)
|
||||
),
|
||||
Result = send_message(Config, #{}),
|
||||
case QueryMode of
|
||||
sync ->
|
||||
?assertMatch(
|
||||
{error, {unrecoverable_error, failed_to_apply_hrecord_template}},
|
||||
Result
|
||||
)
|
||||
%% TODO: async mode is not supported yet
|
||||
end,
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helper fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
common_init(ConfigT) ->
|
||||
Host = os:getenv("HSTREAMDB_HOST", "toxiproxy"),
|
||||
RawPort = os:getenv("HSTREAMDB_PORT", str(?HSTREAMDB_DEFAULT_PORT)),
|
||||
Port = list_to_integer(RawPort),
|
||||
URL = "http://" ++ Host ++ ":" ++ RawPort,
|
||||
|
||||
Config0 = [
|
||||
{hstreamdb_host, Host},
|
||||
{hstreamdb_port, Port},
|
||||
{hstreamdb_url, URL},
|
||||
%% see also for `proxy_name` : $PROJ_ROOT/.ci/docker-compose-file/toxiproxy.json
|
||||
{proxy_name, "hstreamdb"},
|
||||
{batch_size, batch_size(ConfigT)}
|
||||
| ConfigT
|
||||
],
|
||||
|
||||
BridgeType = proplists:get_value(bridge_type, Config0, <<"hstreamdb">>),
|
||||
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
|
||||
true ->
|
||||
% Setup toxiproxy
|
||||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
% Ensure EE bridge module is loaded
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_resource, emqx_bridge]),
|
||||
_ = application:ensure_all_started(hstreamdb_erl),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
% Connect to hstreamdb directly
|
||||
% drop old stream and then create new one
|
||||
connect_and_delete_stream(Config0),
|
||||
connect_and_create_stream(Config0),
|
||||
{Name, HStreamDBConf} = hstreamdb_config(BridgeType, Config0),
|
||||
Config =
|
||||
[
|
||||
{hstreamdb_config, HStreamDBConf},
|
||||
{hstreamdb_bridge_type, BridgeType},
|
||||
{hstreamdb_name, Name},
|
||||
{proxy_host, ProxyHost},
|
||||
{proxy_port, ProxyPort}
|
||||
| Config0
|
||||
],
|
||||
Config;
|
||||
false ->
|
||||
case os:getenv("IS_CI") of
|
||||
"yes" ->
|
||||
throw(no_hstreamdb);
|
||||
_ ->
|
||||
{skip, no_hstreamdb}
|
||||
end
|
||||
end.
|
||||
|
||||
hstreamdb_config(BridgeType, Config) ->
|
||||
Port = integer_to_list(?config(hstreamdb_port, Config)),
|
||||
URL = "http://" ++ ?config(hstreamdb_host, Config) ++ ":" ++ Port,
|
||||
Name = ?BRIDGE_NAME,
|
||||
BatchSize = batch_size(Config),
|
||||
ConfigString =
|
||||
io_lib:format(
|
||||
"bridges.~s.~s {\n"
|
||||
" enable = true\n"
|
||||
" url = ~p\n"
|
||||
" stream = ~p\n"
|
||||
" record_template = ~p\n"
|
||||
" pool_size = ~p\n"
|
||||
" grpc_timeout = ~p\n"
|
||||
" resource_opts = {\n"
|
||||
%% always sync
|
||||
" query_mode = sync\n"
|
||||
" request_ttl = 500ms\n"
|
||||
" batch_size = ~b\n"
|
||||
" worker_pool_size = ~b\n"
|
||||
" }\n"
|
||||
"}",
|
||||
[
|
||||
BridgeType,
|
||||
Name,
|
||||
URL,
|
||||
?STREAM,
|
||||
?RECORD_TEMPLATE,
|
||||
?POOL_SIZE,
|
||||
?GRPC_TIMEOUT,
|
||||
BatchSize,
|
||||
?WORKER_POOL_SIZE
|
||||
]
|
||||
),
|
||||
{Name, parse_and_check(ConfigString, BridgeType, Name)}.
|
||||
|
||||
parse_and_check(ConfigString, BridgeType, Name) ->
|
||||
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
||||
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
||||
#{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
|
||||
Config.
|
||||
|
||||
-define(RPC_OPTIONS, #{pool_size => 4}).
|
||||
|
||||
-define(CONN_ATTEMPTS, 10).
|
||||
|
||||
default_options(Config) ->
|
||||
[
|
||||
{url, ?config(hstreamdb_url, Config)},
|
||||
{rpc_options, ?RPC_OPTIONS}
|
||||
].
|
||||
|
||||
connect_direct_hstream(Name, Config) ->
|
||||
client(Name, Config, ?CONN_ATTEMPTS).
|
||||
|
||||
client(_Name, _Config, N) when N =< 0 -> error(cannot_connect);
|
||||
client(Name, Config, N) ->
|
||||
try
|
||||
_ = hstreamdb:stop_client(Name),
|
||||
{ok, Client} = hstreamdb:start_client(Name, default_options(Config)),
|
||||
{ok, echo} = hstreamdb:echo(Client),
|
||||
Client
|
||||
catch
|
||||
Class:Error ->
|
||||
ct:print("Error connecting: ~p", [{Class, Error}]),
|
||||
ct:sleep(timer:seconds(1)),
|
||||
client(Name, Config, N - 1)
|
||||
end.
|
||||
|
||||
disconnect(Client) ->
|
||||
hstreamdb:stop_client(Client).
|
||||
|
||||
create_bridge(Config) ->
|
||||
create_bridge(Config, _Overrides = #{}).
|
||||
|
||||
create_bridge(Config, Overrides) ->
|
||||
BridgeType = ?config(hstreamdb_bridge_type, Config),
|
||||
Name = ?config(hstreamdb_name, Config),
|
||||
HSDBConfig0 = ?config(hstreamdb_config, Config),
|
||||
HSDBConfig = emqx_utils_maps:deep_merge(HSDBConfig0, Overrides),
|
||||
emqx_bridge:create(BridgeType, Name, HSDBConfig).
|
||||
|
||||
delete_bridge(Config) ->
|
||||
BridgeType = ?config(hstreamdb_bridge_type, Config),
|
||||
Name = ?config(hstreamdb_name, Config),
|
||||
emqx_bridge:remove(BridgeType, Name).
|
||||
|
||||
create_bridge_http(Params) ->
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
|
||||
{ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
|
||||
Error -> Error
|
||||
end.
|
||||
|
||||
send_message(Config, Data) ->
|
||||
Name = ?config(hstreamdb_name, Config),
|
||||
BridgeType = ?config(hstreamdb_bridge_type, Config),
|
||||
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
|
||||
emqx_bridge:send_message(BridgeID, Data).
|
||||
|
||||
query_resource(Config, Request) ->
|
||||
Name = ?config(hstreamdb_name, Config),
|
||||
BridgeType = ?config(hstreamdb_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
|
||||
|
||||
restart_resource(Config) ->
|
||||
BridgeName = ?config(hstreamdb_name, Config),
|
||||
BridgeType = ?config(hstreamdb_bridge_type, Config),
|
||||
emqx_bridge:disable_enable(disable, BridgeType, BridgeName),
|
||||
timer:sleep(200),
|
||||
emqx_bridge:disable_enable(enable, BridgeType, BridgeName).
|
||||
|
||||
resource_id(Config) ->
|
||||
BridgeName = ?config(hstreamdb_name, Config),
|
||||
BridgeType = ?config(hstreamdb_bridge_type, Config),
|
||||
_ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName).
|
||||
|
||||
health_check_resource_ok(Config) ->
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))).
|
||||
|
||||
health_check_resource_down(Config) ->
|
||||
case emqx_resource_manager:health_check(resource_id(Config)) of
|
||||
{ok, Status} when Status =:= disconnected orelse Status =:= connecting ->
|
||||
ok;
|
||||
{error, timeout} ->
|
||||
ok;
|
||||
Other ->
|
||||
?assert(
|
||||
false, lists:flatten(io_lib:format("invalid health check result:~p~n", [Other]))
|
||||
)
|
||||
end.
|
||||
|
||||
% These funs start and then stop the hstreamdb connection
|
||||
connect_and_create_stream(Config) ->
|
||||
?WITH_CLIENT(
|
||||
_ = hstreamdb:create_stream(
|
||||
Client, ?STREAM, ?REPLICATION_FACTOR, ?BACKLOG_RETENTION_SECOND, ?SHARD_COUNT
|
||||
)
|
||||
),
|
||||
%% force write to stream to make it created and ready to be written data for rest cases
|
||||
ProducerOptions = [
|
||||
{pool_size, 4},
|
||||
{stream, ?STREAM},
|
||||
{callback, fun(_) -> ok end},
|
||||
{max_records, 10},
|
||||
{interval, 1000}
|
||||
],
|
||||
?WITH_CLIENT(
|
||||
begin
|
||||
{ok, Producer} = hstreamdb:start_producer(Client, test_producer, ProducerOptions),
|
||||
_ = hstreamdb:append_flush(Producer, hstreamdb:to_record([], raw, rand_payload())),
|
||||
_ = hstreamdb:stop_producer(Producer)
|
||||
end
|
||||
).
|
||||
|
||||
connect_and_delete_stream(Config) ->
|
||||
?WITH_CLIENT(
|
||||
_ = hstreamdb:delete_stream(Client, ?STREAM)
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% help functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
batch_size(Config) ->
|
||||
case ?config(enable_batch, Config) of
|
||||
true -> ?BATCH_SIZE;
|
||||
false -> 1
|
||||
end.
|
||||
|
||||
rand_data() ->
|
||||
#{
|
||||
%% Raw MTTT Payload in binary
|
||||
payload => rand_payload(),
|
||||
id => <<"0005F8F84FFFAFB9F44200000D810002">>,
|
||||
topic => <<"test/topic">>,
|
||||
qos => 0
|
||||
}.
|
||||
|
||||
rand_payload() ->
|
||||
emqx_utils_json:encode(#{
|
||||
temperature => rand:uniform(40), humidity => rand:uniform(100)
|
||||
}).
|
||||
|
||||
gen_batch_req(Count) when
|
||||
is_integer(Count) andalso Count > 0
|
||||
->
|
||||
[{send_message, rand_data()} || _Val <- lists:seq(1, Count)];
|
||||
gen_batch_req(Count) ->
|
||||
ct:pal("Gen batch requests failed with unexpected Count: ~p", [Count]).
|
||||
|
||||
str(List) when is_list(List) ->
|
||||
unicode:characters_to_list(List, utf8);
|
||||
str(Bin) when is_binary(Bin) ->
|
||||
unicode:characters_to_list(Bin, utf8);
|
||||
str(Num) when is_number(Num) ->
|
||||
number_to_list(Num).
|
||||
|
||||
number_to_list(Int) when is_integer(Int) ->
|
||||
integer_to_list(Int);
|
||||
number_to_list(Float) when is_float(Float) ->
|
||||
float_to_list(Float, [{decimals, 10}, compact]).
|
|
@ -1,8 +1,14 @@
|
|||
{application, emqx_bridge_influxdb, [
|
||||
{description, "EMQX Enterprise InfluxDB Bridge"},
|
||||
{vsn, "0.1.2"},
|
||||
{vsn, "0.1.3"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, influxdb]},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
influxdb
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_bridge_iotdb, [
|
||||
{description, "EMQX Enterprise Apache IoTDB Bridge"},
|
||||
{vsn, "0.1.2"},
|
||||
{vsn, "0.1.3"},
|
||||
{modules, [
|
||||
emqx_bridge_iotdb,
|
||||
emqx_bridge_iotdb_impl
|
||||
|
@ -10,6 +10,9 @@
|
|||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
%% for module emqx_connector_http
|
||||
emqx_connector
|
||||
]},
|
||||
{env, []},
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
desc/1
|
||||
]).
|
||||
|
||||
%% emqx_ee_bridge "unofficial" API
|
||||
%% emqx_bridge_enterprise "unofficial" API
|
||||
-export([conn_bridge_examples/1]).
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_bridge_kafka, [
|
||||
{description, "EMQX Enterprise Kafka Bridge"},
|
||||
{vsn, "0.1.4"},
|
||||
{vsn, "0.1.5"},
|
||||
{registered, [emqx_bridge_kafka_consumer_sup]},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
telemetry,
|
||||
wolff,
|
||||
brod,
|
||||
|
|
|
@ -40,7 +40,7 @@ query_mode(_) ->
|
|||
|
||||
callback_mode() -> async_if_possible.
|
||||
|
||||
%% @doc Config schema is defined in emqx_ee_bridge_kafka.
|
||||
%% @doc Config schema is defined in emqx_bridge_kafka.
|
||||
on_start(InstId, Config) ->
|
||||
#{
|
||||
authentication := Auth,
|
||||
|
|
|
@ -73,11 +73,9 @@ wait_until_kafka_is_up(Attempts) ->
|
|||
end.
|
||||
|
||||
init_per_suite(Config) ->
|
||||
%% ensure loaded
|
||||
_ = application:load(emqx_ee_bridge),
|
||||
_ = emqx_ee_bridge:module_info(),
|
||||
application:load(emqx_bridge),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||
%% Ensure enterprise bridge module is loaded
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
ok = emqx_connector_test_helpers:start_apps(?APPS),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
|
|
|
@ -1,8 +1,13 @@
|
|||
{application, emqx_bridge_matrix, [
|
||||
{description, "EMQX Enterprise MatrixDB Bridge"},
|
||||
{vsn, "0.1.1"},
|
||||
{vsn, "0.1.2"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib]},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_bridge_mongodb, [
|
||||
{description, "EMQX Enterprise MongoDB Bridge"},
|
||||
{vsn, "0.2.0"},
|
||||
{vsn, "0.2.1"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
|
@ -8,7 +8,6 @@
|
|||
emqx_connector,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
emqx_ee_bridge,
|
||||
emqx_mongodb
|
||||
]},
|
||||
{env, []},
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
|
||||
-behaviour(hocon_schema).
|
||||
|
||||
%% emqx_ee_bridge "callbacks"
|
||||
%% emqx_bridge_enterprise "callbacks"
|
||||
-export([
|
||||
conn_bridge_examples/1
|
||||
]).
|
||||
|
|
|
@ -58,7 +58,7 @@ on_query(InstanceId, {send_message, Message0}, State) ->
|
|||
},
|
||||
Message = render_message(PayloadTemplate, Message0),
|
||||
Res = emqx_mongodb:on_query(InstanceId, {send_message, Message}, NewConnectorState),
|
||||
?tp(mongo_ee_connector_on_query_return, #{result => Res}),
|
||||
?tp(mongo_bridge_connector_on_query_return, #{result => Res}),
|
||||
Res;
|
||||
on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
|
||||
emqx_mongodb:on_query(InstanceId, Request, ConnectorState).
|
||||
|
|
|
@ -116,7 +116,7 @@ init_per_suite(Config) ->
|
|||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_mgmt_api_test_util:end_suite(),
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf, emqx_rule_engine]),
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_mongodb, emqx_bridge, emqx_rule_engine, emqx_conf]),
|
||||
ok.
|
||||
|
||||
init_per_testcase(_Testcase, Config) ->
|
||||
|
@ -146,9 +146,8 @@ start_apps() ->
|
|||
]).
|
||||
|
||||
ensure_loaded() ->
|
||||
_ = application:load(emqx_ee_bridge),
|
||||
_ = application:load(emqtt),
|
||||
_ = emqx_ee_bridge:module_info(),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
ok.
|
||||
|
||||
mongo_type(Config) ->
|
||||
|
@ -354,7 +353,7 @@ t_setup_via_config_and_publish(Config) ->
|
|||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
send_message(Config, #{key => Val}),
|
||||
#{?snk_kind := mongo_ee_connector_on_query_return},
|
||||
#{?snk_kind := mongo_bridge_connector_on_query_return},
|
||||
5_000
|
||||
),
|
||||
?assertMatch(
|
||||
|
@ -379,7 +378,7 @@ t_setup_via_http_api_and_publish(Config) ->
|
|||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
send_message(Config, #{key => Val}),
|
||||
#{?snk_kind := mongo_ee_connector_on_query_return},
|
||||
#{?snk_kind := mongo_bridge_connector_on_query_return},
|
||||
5_000
|
||||
),
|
||||
?assertMatch(
|
||||
|
@ -395,7 +394,7 @@ t_payload_template(Config) ->
|
|||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
send_message(Config, #{key => Val, clientid => ClientId}),
|
||||
#{?snk_kind := mongo_ee_connector_on_query_return},
|
||||
#{?snk_kind := mongo_bridge_connector_on_query_return},
|
||||
5_000
|
||||
),
|
||||
?assertMatch(
|
||||
|
@ -421,7 +420,7 @@ t_collection_template(Config) ->
|
|||
clientid => ClientId,
|
||||
mycollectionvar => <<"mycol">>
|
||||
}),
|
||||
#{?snk_kind := mongo_ee_connector_on_query_return},
|
||||
#{?snk_kind := mongo_bridge_connector_on_query_return},
|
||||
5_000
|
||||
),
|
||||
?assertMatch(
|
||||
|
|
|
@ -1,8 +1,15 @@
|
|||
{application, emqx_bridge_mysql, [
|
||||
{description, "EMQX Enterprise MySQL Bridge"},
|
||||
{vsn, "0.1.1"},
|
||||
{vsn, "0.1.2"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, emqx_connector, emqx_resource, emqx_bridge, emqx_mysql]},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_connector,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
emqx_mysql
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -142,10 +142,9 @@ common_init(Config0) ->
|
|||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
% Ensure EE bridge module is loaded
|
||||
_ = application:load(emqx_ee_bridge),
|
||||
_ = emqx_ee_bridge:module_info(),
|
||||
% Ensure enterprise bridge module is loaded
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, emqx_rule_engine]),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
% Connect to mysql directly and create the table
|
||||
connect_and_create_table(Config0),
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
{application, emqx_bridge_opents, [
|
||||
{description, "EMQX Enterprise OpenTSDB Bridge"},
|
||||
{vsn, "0.1.1"},
|
||||
{vsn, "0.1.2"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
opentsdb
|
||||
]},
|
||||
{env, []},
|
||||
|
|
|
@ -53,7 +53,7 @@ init_per_suite(Config) ->
|
|||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_mgmt_api_test_util:end_suite(),
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
|
||||
ok = emqx_common_test_helpers:stop_apps([opentsdb, emqx_bridge, emqx_resource, emqx_conf]),
|
||||
ok.
|
||||
|
||||
init_per_testcase(_Testcase, Config) ->
|
||||
|
@ -91,10 +91,12 @@ common_init(ConfigT) ->
|
|||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
% Ensure EE bridge module is loaded
|
||||
_ = application:load(emqx_ee_bridge),
|
||||
_ = emqx_ee_bridge:module_info(),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
% Ensure enterprise bridge module is loaded
|
||||
ok = emqx_common_test_helpers:start_apps([
|
||||
emqx_conf, emqx_resource, emqx_bridge
|
||||
]),
|
||||
_ = application:ensure_all_started(opentsdb),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
{Name, OpenTSConf} = opents_config(BridgeType, Config0),
|
||||
Config =
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
{application, emqx_bridge_oracle, [
|
||||
{description, "EMQX Enterprise Oracle Database Bridge"},
|
||||
{vsn, "0.1.2"},
|
||||
{vsn, "0.1.3"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
emqx_oracle
|
||||
]},
|
||||
{env, []},
|
||||
|
|
|
@ -83,8 +83,9 @@ common_init_per_group() ->
|
|||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
application:load(emqx_bridge),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||
%% Ensure enterprise bridge module is loaded
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
ok = emqx_connector_test_helpers:start_apps(?APPS),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
|
|
|
@ -1,8 +1,13 @@
|
|||
{application, emqx_bridge_pgsql, [
|
||||
{description, "EMQX Enterprise PostgreSQL Bridge"},
|
||||
{vsn, "0.1.2"},
|
||||
{vsn, "0.1.3"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib]},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -145,10 +145,9 @@ common_init(Config0) ->
|
|||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
% Ensure EE bridge module is loaded
|
||||
_ = application:load(emqx_ee_bridge),
|
||||
_ = emqx_ee_bridge:module_info(),
|
||||
% Ensure enterprise bridge module is loaded
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
% Connect to pgsql directly and create the table
|
||||
connect_and_create_table(Config0),
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
{application, emqx_bridge_pulsar, [
|
||||
{description, "EMQX Pulsar Bridge"},
|
||||
{vsn, "0.1.4"},
|
||||
{vsn, "0.1.5"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
pulsar
|
||||
]},
|
||||
{env, []},
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
fields/1,
|
||||
desc/1
|
||||
]).
|
||||
%% emqx_ee_bridge "unofficial" API
|
||||
%% emqx_bridge_enterprise "unofficial" API
|
||||
-export([conn_bridge_examples/1]).
|
||||
|
||||
-export([producer_strategy_key_validator/1]).
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||
|
||||
-define(BRIDGE_TYPE_BIN, <<"pulsar_producer">>).
|
||||
-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_pulsar]).
|
||||
-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]).
|
||||
-define(RULE_TOPIC, "mqtt/rule").
|
||||
-define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
|
||||
|
||||
|
@ -122,9 +122,11 @@ common_init_per_group() ->
|
|||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
application:load(emqx_bridge),
|
||||
%% Ensure enterprise bridge module is loaded
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:start_apps(?APPS),
|
||||
ok = emqx_common_test_helpers:start_apps(?APPS),
|
||||
{ok, _} = application:ensure_all_started(pulsar),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
UniqueNum = integer_to_binary(erlang:unique_integer()),
|
||||
|
@ -518,7 +520,7 @@ cluster(Config) ->
|
|||
Cluster = emqx_common_test_helpers:emqx_cluster(
|
||||
[core, core],
|
||||
[
|
||||
{apps, [emqx_conf, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]},
|
||||
{apps, [emqx_conf] ++ ?APPS ++ [pulsar]},
|
||||
{listener_ports, []},
|
||||
{peer_mod, PeerModule},
|
||||
{priv_data_dir, PrivDataDir},
|
||||
|
@ -1097,6 +1099,7 @@ do_t_cluster(Config) ->
|
|||
),
|
||||
{ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end),
|
||||
{ok, _} = snabbkaffe:receive_events(SRef1),
|
||||
erpc:multicall(Nodes, fun wait_until_producer_connected/0),
|
||||
{ok, _} = snabbkaffe:block_until(
|
||||
?match_n_events(
|
||||
NumNodes,
|
||||
|
@ -1118,7 +1121,6 @@ do_t_cluster(Config) ->
|
|||
end,
|
||||
Nodes
|
||||
),
|
||||
erpc:multicall(Nodes, fun wait_until_producer_connected/0),
|
||||
Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
|
||||
?tp(publishing_message, #{}),
|
||||
erpc:call(N2, emqx, publish, [Message0]),
|
||||
|
|
|
@ -1,8 +1,16 @@
|
|||
{application, emqx_bridge_rabbitmq, [
|
||||
{description, "EMQX Enterprise RabbitMQ Bridge"},
|
||||
{vsn, "0.1.2"},
|
||||
{vsn, "0.1.3"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, ecql, rabbit_common, amqp_client]},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
ecql,
|
||||
rabbit_common,
|
||||
amqp_client
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
|
||||
%% See comment in
|
||||
%% lib-ee/emqx_ee_connector/test/ee_connector_rabbitmq_SUITE.erl for how to
|
||||
%% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to
|
||||
%% run this without bringing up the whole CI infrastucture
|
||||
|
||||
rabbit_mq_host() ->
|
||||
|
@ -50,8 +50,6 @@ init_per_suite(Config) ->
|
|||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
{ok, _} = application:ensure_all_started(emqx_ee_connector),
|
||||
{ok, _} = application:ensure_all_started(emqx_ee_bridge),
|
||||
{ok, _} = application:ensure_all_started(amqp_client),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
ChannelConnection = setup_rabbit_mq_exchange_and_queue(),
|
||||
|
@ -112,7 +110,6 @@ end_per_suite(Config) ->
|
|||
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
||||
_ = application:stop(emqx_connector),
|
||||
_ = application:stop(emqx_ee_connector),
|
||||
_ = application:stop(emqx_bridge),
|
||||
%% Close the channel
|
||||
ok = amqp_channel:close(Channel),
|
||||
|
|
|
@ -48,7 +48,6 @@ init_per_suite(Config) ->
|
|||
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
{ok, _} = application:ensure_all_started(emqx_ee_connector),
|
||||
{ok, _} = application:ensure_all_started(amqp_client),
|
||||
ChannelConnection = setup_rabbit_mq_exchange_and_queue(),
|
||||
[{channel_connection, ChannelConnection} | Config];
|
||||
|
|
|
@ -1,8 +1,15 @@
|
|||
{application, emqx_bridge_redis, [
|
||||
{description, "EMQX Enterprise Redis Bridge"},
|
||||
{vsn, "0.1.1"},
|
||||
{vsn, "0.1.2"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, emqx_connector, emqx_resource, emqx_bridge, emqx_redis]},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_connector,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
emqx_redis
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -28,7 +28,7 @@ on_start(InstId, #{command_template := CommandTemplate} = Config) ->
|
|||
case emqx_redis:on_start(InstId, Config) of
|
||||
{ok, RedisConnSt} ->
|
||||
?tp(
|
||||
redis_ee_connector_start_success,
|
||||
redis_bridge_connector_start_success,
|
||||
#{}
|
||||
),
|
||||
{ok, #{
|
||||
|
@ -37,7 +37,7 @@ on_start(InstId, #{command_template := CommandTemplate} = Config) ->
|
|||
}};
|
||||
{error, _} = Error ->
|
||||
?tp(
|
||||
redis_ee_connector_start_error,
|
||||
redis_bridge_connector_start_error,
|
||||
#{error => Error}
|
||||
),
|
||||
Error
|
||||
|
@ -60,12 +60,12 @@ on_query(
|
|||
) ->
|
||||
Cmd = proc_command_template(CommandTemplate, Data),
|
||||
?tp(
|
||||
redis_ee_connector_cmd,
|
||||
redis_bridge_connector_cmd,
|
||||
#{cmd => Cmd, batch => false, mode => sync}
|
||||
),
|
||||
Result = query(InstId, {cmd, Cmd}, RedisConnSt),
|
||||
?tp(
|
||||
redis_ee_connector_send_done,
|
||||
redis_bridge_connector_send_done,
|
||||
#{cmd => Cmd, batch => false, mode => sync, result => Result}
|
||||
),
|
||||
Result;
|
||||
|
@ -75,12 +75,12 @@ on_query(
|
|||
_State = #{conn_st := RedisConnSt}
|
||||
) ->
|
||||
?tp(
|
||||
redis_ee_connector_query,
|
||||
redis_bridge_connector_query,
|
||||
#{query => Query, batch => false, mode => sync}
|
||||
),
|
||||
Result = query(InstId, Query, RedisConnSt),
|
||||
?tp(
|
||||
redis_ee_connector_send_done,
|
||||
redis_bridge_connector_send_done,
|
||||
#{query => Query, batch => false, mode => sync, result => Result}
|
||||
),
|
||||
Result.
|
||||
|
@ -90,12 +90,12 @@ on_batch_query(
|
|||
) ->
|
||||
Cmds = process_batch_data(BatchData, CommandTemplate),
|
||||
?tp(
|
||||
redis_ee_connector_send,
|
||||
redis_bridge_connector_send,
|
||||
#{batch_data => BatchData, batch => true, mode => sync}
|
||||
),
|
||||
Result = query(InstId, {cmds, Cmds}, RedisConnSt),
|
||||
?tp(
|
||||
redis_ee_connector_send_done,
|
||||
redis_bridge_connector_send_done,
|
||||
#{
|
||||
batch_data => BatchData,
|
||||
batch_size => length(BatchData),
|
||||
|
|
|
@ -117,11 +117,9 @@ wait_for_ci_redis(Checks, Config) ->
|
|||
ProxyHost = os:getenv("PROXY_HOST", ?PROXY_HOST),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", ?PROXY_PORT)),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:start_apps([
|
||||
emqx_resource, emqx_bridge, emqx_rule_engine
|
||||
ok = emqx_common_test_helpers:start_apps([
|
||||
emqx_conf, emqx_resource, emqx_connector, emqx_bridge, emqx_rule_engine
|
||||
]),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
[
|
||||
{proxy_host, ProxyHost},
|
||||
{proxy_port, ProxyPort}
|
||||
|
@ -271,21 +269,21 @@ t_check_replay(Config) ->
|
|||
lists:seq(1, ?BATCH_SIZE)
|
||||
),
|
||||
#{
|
||||
?snk_kind := redis_ee_connector_send_done,
|
||||
?snk_kind := redis_bridge_connector_send_done,
|
||||
batch := true,
|
||||
result := {error, _}
|
||||
},
|
||||
10_000
|
||||
)
|
||||
end),
|
||||
#{?snk_kind := redis_ee_connector_send_done, batch := true, result := {ok, _}},
|
||||
#{?snk_kind := redis_bridge_connector_send_done, batch := true, result := {ok, _}},
|
||||
10_000
|
||||
),
|
||||
fun(Trace) ->
|
||||
?assert(
|
||||
?strict_causality(
|
||||
#{?snk_kind := redis_ee_connector_send_done, result := {error, _}},
|
||||
#{?snk_kind := redis_ee_connector_send_done, result := {ok, _}},
|
||||
#{?snk_kind := redis_bridge_connector_send_done, result := {error, _}},
|
||||
#{?snk_kind := redis_bridge_connector_send_done, result := {ok, _}},
|
||||
Trace
|
||||
)
|
||||
)
|
||||
|
@ -308,14 +306,14 @@ t_permanent_error(_Config) ->
|
|||
begin
|
||||
?wait_async_action(
|
||||
publish_message(Topic, Payload),
|
||||
#{?snk_kind := redis_ee_connector_send_done},
|
||||
#{?snk_kind := redis_bridge_connector_send_done},
|
||||
10_000
|
||||
)
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertMatch(
|
||||
[#{result := {error, _}} | _],
|
||||
?of_kind(redis_ee_connector_send_done, Trace)
|
||||
?of_kind(redis_bridge_connector_send_done, Trace)
|
||||
)
|
||||
end
|
||||
),
|
||||
|
@ -334,7 +332,7 @@ t_create_disconnected(Config) ->
|
|||
fun(Trace) ->
|
||||
?assertMatch(
|
||||
[#{error := _} | _],
|
||||
?of_kind(redis_ee_connector_start_error, Trace)
|
||||
?of_kind(redis_bridge_connector_start_error, Trace)
|
||||
),
|
||||
ok
|
||||
end
|
||||
|
@ -365,7 +363,7 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
|
|||
end,
|
||||
lists:seq(1, N)
|
||||
),
|
||||
#{?snk_kind := redis_ee_connector_send_done, batch := IsBatch},
|
||||
#{?snk_kind := redis_bridge_connector_send_done, batch := IsBatch},
|
||||
5000
|
||||
),
|
||||
fun(Trace) ->
|
||||
|
@ -374,13 +372,13 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
|
|||
true ->
|
||||
?assertMatch(
|
||||
[#{result := {ok, _}, batch := true, batch_size := ?BATCH_SIZE} | _],
|
||||
?of_kind(redis_ee_connector_send_done, Trace)
|
||||
?of_kind(redis_bridge_connector_send_done, Trace)
|
||||
),
|
||||
?assertEqual(?BATCH_SIZE, AddedMsgCount);
|
||||
false ->
|
||||
?assertMatch(
|
||||
[#{result := {ok, _}, batch := false} | _],
|
||||
?of_kind(redis_ee_connector_send_done, Trace)
|
||||
?of_kind(redis_bridge_connector_send_done, Trace)
|
||||
),
|
||||
?assertEqual(1, AddedMsgCount)
|
||||
end
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
{application, emqx_bridge_rocketmq, [
|
||||
{description, "EMQX Enterprise RocketMQ Bridge"},
|
||||
{vsn, "0.1.2"},
|
||||
{vsn, "0.1.3"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, rocketmq]},
|
||||
{applications, [kernel, stdlib, emqx_resource, emqx_bridge, rocketmq]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -109,10 +109,11 @@ common_init(ConfigT) ->
|
|||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
% Ensure EE bridge module is loaded
|
||||
_ = application:load(emqx_ee_bridge),
|
||||
_ = emqx_ee_bridge:module_info(),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
% Ensure enterprise bridge module is loaded
|
||||
ok = emqx_common_test_helpers:start_apps([
|
||||
emqx_conf, emqx_resource, emqx_bridge, rocketmq
|
||||
]),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
{Name, RocketMQConf} = rocketmq_config(BridgeType, Config0),
|
||||
Config =
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
{application, emqx_bridge_sqlserver, [
|
||||
{description, "EMQX Enterprise SQL Server Bridge"},
|
||||
{vsn, "0.1.1"},
|
||||
{vsn, "0.1.2"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, odbc]},
|
||||
{applications, [kernel, stdlib, emqx_resource, emqx_bridge, odbc]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -416,10 +416,9 @@ common_init(ConfigT) ->
|
|||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
% Ensure EE bridge module is loaded
|
||||
_ = application:load(emqx_ee_bridge),
|
||||
_ = emqx_ee_bridge:module_info(),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
% Ensure enterprise bridge module is loaded
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, odbc]),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
% Connect to sqlserver directly
|
||||
% drop old db and table, and then create new ones
|
||||
|
|
|
@ -1,8 +1,14 @@
|
|||
{application, emqx_bridge_tdengine, [
|
||||
{description, "EMQX Enterprise TDEngine Bridge"},
|
||||
{vsn, "0.1.3"},
|
||||
{vsn, "0.1.4"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, tdengine]},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_resource,
|
||||
emqx_bridge,
|
||||
tdengine
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -142,10 +142,9 @@ common_init(ConfigT) ->
|
|||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
% Ensure EE bridge module is loaded
|
||||
_ = application:load(emqx_ee_bridge),
|
||||
_ = emqx_ee_bridge:module_info(),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
% Ensure enterprise bridge module is loaded
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, tdengine]),
|
||||
_ = emqx_bridge_enterprise:module_info(),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
% Connect to tdengine directly and create the table
|
||||
connect_and_create_table(Config0),
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
{application, emqx_bridge_timescale, [
|
||||
{description, "EMQX Enterprise TimescaleDB Bridge"},
|
||||
{vsn, "0.1.1"},
|
||||
{vsn, "0.1.2"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib]},
|
||||
{applications, [kernel, stdlib, emqx_resource, emqx_bridge]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Add HStreamDB bridge support, adapted to the HStreamDB `v0.15.0`.
|
|
@ -1,19 +0,0 @@
|
|||
.rebar3
|
||||
_*
|
||||
.eunit
|
||||
*.o
|
||||
*.beam
|
||||
*.plt
|
||||
*.swp
|
||||
*.swo
|
||||
.erlang.cookie
|
||||
ebin
|
||||
log
|
||||
erl_crash.dump
|
||||
.rebar
|
||||
logs
|
||||
_build
|
||||
.idea
|
||||
*.iml
|
||||
rebar3.crashdump
|
||||
*~
|
|
@ -1,9 +0,0 @@
|
|||
emqx_ee_bridge
|
||||
=====
|
||||
|
||||
An OTP application
|
||||
|
||||
Build
|
||||
-----
|
||||
|
||||
$ rebar3 compile
|
|
@ -1 +0,0 @@
|
|||
toxiproxy
|
|
@ -1,11 +0,0 @@
|
|||
%% -*- mode: erlang; -*-
|
||||
{erl_opts, [debug_info]}.
|
||||
{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}}
|
||||
, {emqx_resource, {path, "../../apps/emqx_resource"}}
|
||||
, {emqx_bridge, {path, "../../apps/emqx_bridge"}}
|
||||
, {emqx_utils, {path, "../emqx_utils"}}
|
||||
]}.
|
||||
|
||||
{shell, [
|
||||
{apps, [emqx_ee_bridge]}
|
||||
]}.
|
|
@ -1,27 +0,0 @@
|
|||
{application, emqx_ee_bridge, [
|
||||
{description, "EMQX Enterprise data bridges"},
|
||||
{vsn, "0.1.16"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx_ee_connector,
|
||||
telemetry,
|
||||
emqx_bridge_kafka,
|
||||
emqx_bridge_gcp_pubsub,
|
||||
emqx_bridge_cassandra,
|
||||
emqx_bridge_opents,
|
||||
emqx_bridge_pulsar,
|
||||
emqx_bridge_dynamo,
|
||||
emqx_bridge_sqlserver,
|
||||
emqx_bridge_rocketmq,
|
||||
emqx_bridge_rabbitmq,
|
||||
emqx_bridge_tdengine,
|
||||
emqx_bridge_influxdb,
|
||||
emqx_bridge_clickhouse
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
|
||||
{links, []}
|
||||
]}.
|
|
@ -1,90 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_ee_bridge_hstreamdb).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
-export([
|
||||
conn_bridge_examples/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
namespace/0,
|
||||
roots/0,
|
||||
fields/1,
|
||||
desc/1
|
||||
]).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% api
|
||||
|
||||
conn_bridge_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"hstreamdb">> => #{
|
||||
summary => <<"HStreamDB Bridge">>,
|
||||
value => values(Method)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
values(_Method) ->
|
||||
#{
|
||||
type => hstreamdb,
|
||||
name => <<"demo">>,
|
||||
connector => <<"hstreamdb:connector">>,
|
||||
enable => true,
|
||||
direction => egress,
|
||||
local_topic => <<"local/topic/#">>,
|
||||
payload => <<"${payload}">>
|
||||
}.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Hocon Schema Definitions
|
||||
namespace() -> "bridge_hstreamdb".
|
||||
|
||||
roots() -> [].
|
||||
|
||||
fields("config") ->
|
||||
[
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
|
||||
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
|
||||
{payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})},
|
||||
{connector, field(connector)}
|
||||
];
|
||||
fields("post") ->
|
||||
[type_field(), name_field() | fields("config")];
|
||||
fields("put") ->
|
||||
fields("config");
|
||||
fields("get") ->
|
||||
emqx_bridge_schema:status_fields() ++ fields("post").
|
||||
|
||||
field(connector) ->
|
||||
mk(
|
||||
hoconsc:union([binary(), ref(emqx_ee_connector_hstreamdb, config)]),
|
||||
#{
|
||||
required => true,
|
||||
example => <<"hstreamdb:demo">>,
|
||||
desc => ?DESC("desc_connector")
|
||||
}
|
||||
).
|
||||
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for HStream using `", string:to_upper(Method), "` method."];
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% internal
|
||||
type_field() ->
|
||||
{type, mk(enum([hstreamdb]), #{required => true, desc => ?DESC("desc_type")})}.
|
||||
|
||||
name_field() ->
|
||||
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|
|
@ -1,16 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(ee_bridge_hstreamdb_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
%% TODO:
|
|
@ -1,19 +0,0 @@
|
|||
.rebar3
|
||||
_*
|
||||
.eunit
|
||||
*.o
|
||||
*.beam
|
||||
*.plt
|
||||
*.swp
|
||||
*.swo
|
||||
.erlang.cookie
|
||||
ebin
|
||||
log
|
||||
erl_crash.dump
|
||||
.rebar
|
||||
logs
|
||||
_build
|
||||
.idea
|
||||
*.iml
|
||||
rebar3.crashdump
|
||||
*~
|
|
@ -1,9 +0,0 @@
|
|||
emqx_ee_connector
|
||||
=====
|
||||
|
||||
An OTP application
|
||||
|
||||
Build
|
||||
-----
|
||||
|
||||
$ rebar3 compile
|
|
@ -1,16 +0,0 @@
|
|||
{application, emqx_ee_connector, [
|
||||
{description, "EMQX Enterprise connectors"},
|
||||
{vsn, "0.1.15"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
ecpool,
|
||||
hstreamdb_erl,
|
||||
emqx_redis
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
|
||||
{links, []}
|
||||
]}.
|
|
@ -1,16 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_ee_connector_hstreamdb_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
%% TODO:
|
4
mix.exs
4
mix.exs
|
@ -195,7 +195,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
|
||||
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
|
||||
[
|
||||
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
|
||||
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.3.1+v0.12.0"},
|
||||
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true},
|
||||
{:wolff, github: "kafka4beam/wolff", tag: "1.7.6"},
|
||||
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
|
||||
|
@ -395,8 +395,6 @@ defmodule EMQXUmbrella.MixProject do
|
|||
do: [
|
||||
emqx_license: :permanent,
|
||||
emqx_enterprise: :load,
|
||||
emqx_ee_connector: :permanent,
|
||||
emqx_ee_bridge: :permanent,
|
||||
emqx_bridge_kafka: :permanent,
|
||||
emqx_bridge_pulsar: :permanent,
|
||||
emqx_bridge_gcp_pubsub: :permanent,
|
||||
|
|
|
@ -463,8 +463,6 @@ relx_apps_per_edition(ee) ->
|
|||
[
|
||||
emqx_license,
|
||||
{emqx_enterprise, load},
|
||||
emqx_ee_connector,
|
||||
emqx_ee_bridge,
|
||||
emqx_bridge_kafka,
|
||||
emqx_bridge_pulsar,
|
||||
emqx_bridge_gcp_pubsub,
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
emqx_ee_bridge_hstreamdb {
|
||||
emqx_bridge_hstreamdb {
|
||||
|
||||
config_direction.desc:
|
||||
"""The direction of this bridge, MUST be 'egress'"""
|
||||
|
@ -6,12 +6,6 @@ config_direction.desc:
|
|||
config_direction.label:
|
||||
"""Bridge Direction"""
|
||||
|
||||
config_enable.desc:
|
||||
"""Enable or disable this bridge"""
|
||||
|
||||
config_enable.label:
|
||||
"""Enable Or Disable Bridge"""
|
||||
|
||||
desc_config.desc:
|
||||
"""Configuration for an HStreamDB bridge."""
|
||||
|
||||
|
@ -46,10 +40,10 @@ will be forwarded."""
|
|||
local_topic.label:
|
||||
"""Local Topic"""
|
||||
|
||||
payload.desc:
|
||||
"""The payload to be forwarded to the HStreamDB. Placeholders supported."""
|
||||
record_template.desc:
|
||||
"""The HStream Record template to be forwarded to the HStreamDB. Placeholders supported."""
|
||||
|
||||
payload.label:
|
||||
"""Payload"""
|
||||
record_template.label:
|
||||
"""HStream Record"""
|
||||
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
emqx_ee_connector_hstreamdb {
|
||||
emqx_bridge_hstreamdb_connector {
|
||||
|
||||
config.desc:
|
||||
"""HStreamDB connection config"""
|
||||
|
@ -6,16 +6,34 @@ config.desc:
|
|||
config.label:
|
||||
"""Connection config"""
|
||||
|
||||
type.desc:
|
||||
"""The Connector Type."""
|
||||
|
||||
type.label:
|
||||
"""Connector Type"""
|
||||
|
||||
name.desc:
|
||||
"""Connector name, used as a human-readable description of the connector."""
|
||||
|
||||
name.label:
|
||||
"""Connector Name"""
|
||||
|
||||
ordering_key.desc:
|
||||
url.desc:
|
||||
"""HStreamDB Server URL"""
|
||||
|
||||
url.label:
|
||||
"""HStreamDB Server URL"""
|
||||
|
||||
stream_name.desc:
|
||||
"""HStreamDB Stream Name"""
|
||||
|
||||
stream_name.label:
|
||||
"""HStreamDB Stream Name"""
|
||||
|
||||
partition_key.desc:
|
||||
"""HStreamDB Ordering Key"""
|
||||
|
||||
ordering_key.label:
|
||||
partition_key.label:
|
||||
"""HStreamDB Ordering Key"""
|
||||
|
||||
pool_size.desc:
|
||||
|
@ -24,22 +42,10 @@ pool_size.desc:
|
|||
pool_size.label:
|
||||
"""HStreamDB Pool Size"""
|
||||
|
||||
stream_name.desc:
|
||||
"""HStreamDB Stream Name"""
|
||||
grpc_timeout.desc:
|
||||
"""HStreamDB gRPC Timeout"""
|
||||
|
||||
stream_name.label:
|
||||
"""HStreamDB Stream Name"""
|
||||
|
||||
type.desc:
|
||||
"""The Connector Type."""
|
||||
|
||||
type.label:
|
||||
"""Connector Type"""
|
||||
|
||||
url.desc:
|
||||
"""HStreamDB Server URL"""
|
||||
|
||||
url.label:
|
||||
"""HStreamDB Server URL"""
|
||||
grpc_timeout.label:
|
||||
"""HStreamDB gRPC Timeout"""
|
||||
|
||||
}
|
|
@ -216,6 +216,9 @@ for dep in ${CT_DEPS}; do
|
|||
gcp_emulator)
|
||||
FILES+=( '.ci/docker-compose-file/docker-compose-gcp-emulator.yaml' )
|
||||
;;
|
||||
hstreamdb)
|
||||
FILES+=( '.ci/docker-compose-file/docker-compose-hstreamdb.yaml' )
|
||||
;;
|
||||
*)
|
||||
echo "unknown_ct_dependency $dep"
|
||||
exit 1
|
||||
|
|
|
@ -270,6 +270,10 @@ hstream
|
|||
hstreamDB
|
||||
hstream
|
||||
hstreamdb
|
||||
hrecord
|
||||
hRecord
|
||||
Hrecord
|
||||
HRecord
|
||||
SASL
|
||||
GSSAPI
|
||||
keytab
|
||||
|
|
Loading…
Reference in New Issue