From 3d4afd65dfcb545a0e453864bc924d2962917de5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 30 Aug 2022 15:47:29 -0300 Subject: [PATCH 1/3] feat: add mongodb bridge (e5.0) --- .../docker-compose-mongo-replicaset-tcp.yaml | 8 +- .../docker-compose-mongo-sharded-tcp.yaml | 90 +++++++ .github/workflows/elixir_release.yml | 19 +- .github/workflows/run_test_cases.yaml | 7 + .../src/schema/emqx_bridge_schema.erl | 18 ++ .../src/emqx_connector_mongo.erl | 45 +++- .../i18n/emqx_ee_bridge_mongodb.conf | 67 +++++ lib-ee/emqx_ee_bridge/rebar.config | 6 +- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 18 +- .../src/emqx_ee_bridge_mongodb.erl | 154 +++++++++++ .../test/emqx_ee_bridge_mongodb_SUITE.erl | 251 ++++++++++++++++++ lib-ee/emqx_ee_connector/rebar.config | 3 +- scripts/docker-ct-apps | 1 + 13 files changed, 673 insertions(+), 14 deletions(-) create mode 100644 .ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml create mode 100644 lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf create mode 100644 lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl create mode 100644 lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl diff --git a/.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml b/.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml index f83fe0932..54506abd8 100644 --- a/.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml +++ b/.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml @@ -18,7 +18,7 @@ services: --ipv6 --bind_ip_all --replSet rs0 - + mongo2: hostname: mongo2 container_name: mongo2 @@ -54,10 +54,10 @@ services: --ipv6 --bind_ip_all --replSet rs0 - - mongo_client: + + mongo_rs_client: image: mongo:${MONGO_TAG} - container_name: mongo_client + container_name: mongo_rs_client networks: - emqx_bridge depends_on: diff --git a/.ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml b/.ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml new file mode 100644 index 000000000..a8b51689b --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml @@ -0,0 +1,90 @@ +version: "3" + +services: + mongosharded1: + hostname: mongosharded1 + container_name: mongosharded1 + image: mongo:${MONGO_TAG} + environment: + MONGO_INITDB_DATABASE: mqtt + networks: + - emqx_bridge + expose: + - 27017 + ports: + - 27014:27017 + restart: always + command: + --configsvr + --replSet cfg0 + --port 27017 + --ipv6 + --bind_ip_all + + mongosharded2: + hostname: mongosharded2 + container_name: mongosharded2 + image: mongo:${MONGO_TAG} + environment: + MONGO_INITDB_DATABASE: mqtt + networks: + - emqx_bridge + expose: + - 27017 + ports: + - 27015:27017 + restart: always + command: + --shardsvr + --replSet rs0 + --port 27017 + --ipv6 + --bind_ip_all + + mongosharded3: + hostname: mongosharded3 + container_name: mongosharded3 + image: mongo:${MONGO_TAG} + environment: + MONGO_INITDB_DATABASE: mqtt + networks: + - emqx_bridge + expose: + - 27017 + ports: + - 27016:27017 + restart: always + entrypoint: mongos + command: + --configdb cfg0/mongosharded1:27017 + --port 27017 + --ipv6 + --bind_ip_all + + mongosharded_client: + image: mongo:${MONGO_TAG} + container_name: mongosharded_client + networks: + - emqx_bridge + depends_on: + - mongosharded1 + - mongosharded2 + - mongosharded3 + command: + - /bin/bash + - -c + - | + while ! mongo --host mongosharded1 --eval 'db.runCommand("ping").ok' --quiet >/dev/null 2>&1 ; do + sleep 1 + done + mongo --host mongosharded1 --eval "rs.initiate( { _id : 'cfg0', configsvr: true, members: [ { _id : 0, host : 'mongosharded1:27017' } ] })" + while ! mongo --host mongosharded2 --eval 'db.runCommand("ping").ok' --quiet >/dev/null 2>&1 ; do + sleep 1 + done + mongo --host mongosharded2 --eval "rs.initiate( { _id : 'rs0', members: [ { _id : 0, host : 'mongosharded2:27017' } ] })" + mongo --host mongosharded2 --eval "rs.status()" + while ! mongo --host mongosharded3 --eval 'db.runCommand("ping").ok' --quiet >/dev/null 2>&1 ; do + sleep 1 + done + mongo --host mongosharded3 --eval "sh.addShard('rs0/mongosharded2:27017')" + mongo --host mongosharded3 --eval "sh.enableSharding('mqtt')" diff --git a/.github/workflows/elixir_release.yml b/.github/workflows/elixir_release.yml index 006d6aba8..b703f2fde 100644 --- a/.github/workflows/elixir_release.yml +++ b/.github/workflows/elixir_release.yml @@ -12,7 +12,18 @@ on: jobs: elixir_release_build: runs-on: ubuntu-latest - container: ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-ubuntu20.04 + strategy: + matrix: + otp: + - 24.2.1-1 + elixir: + - 1.13.4 + os: + - ubuntu20.04 + profile: + - emqx + - emqx-enterprise + container: ghcr.io/emqx/emqx-builder/5.0-17:${{ matrix.elixir }}-${{ matrix.otp }}-${{ matrix.os }} steps: - name: Checkout @@ -23,15 +34,15 @@ jobs: run: | git config --global --add safe.directory "$GITHUB_WORKSPACE" - name: elixir release - run: make emqx-elixir + run: make ${{ matrix.profile }}-elixir - name: start release run: | - cd _build/emqx/rel/emqx + cd _build/${{ matrix.profile }}/rel/emqx bin/emqx start - name: check if started run: | sleep 10 nc -zv localhost 1883 - cd _build/emqx/rel/emqx + cd _build/${{ matrix.profile }}/rel/emqx bin/emqx ping bin/emqx ctl status diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index b4b797621..5d0dd1ca2 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -124,6 +124,8 @@ jobs: docker-compose \ -f .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-single-tls.yaml \ + -f .ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mysql-tls.yaml \ -f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \ @@ -135,6 +137,11 @@ jobs: -f .ci/docker-compose-file/docker-compose.yaml \ up -d --build + - name: wait some services to be fully up + run: | + docker wait mongosharded_client + docker wait mongo_rs_client + # produces .coverdata - name: run common test working-directory: source diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index f55ac840e..af74ae2ec 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -41,12 +41,16 @@ api_schema(Method) -> hoconsc:union(Broker ++ EE). ee_api_schemas(Method) -> + %% must ensure the app is loaded before checking if fn is defined. + ensure_loaded(emqx_ee_bridge, emqx_ee_bridge), case erlang:function_exported(emqx_ee_bridge, api_schemas, 1) of true -> emqx_ee_bridge:api_schemas(Method); false -> [] end. ee_fields_bridges() -> + %% must ensure the app is loaded before checking if fn is defined. + ensure_loaded(emqx_ee_bridge, emqx_ee_bridge), case erlang:function_exported(emqx_ee_bridge, fields, 1) of true -> emqx_ee_bridge:fields(bridges); false -> [] @@ -155,3 +159,17 @@ status() -> node_name() -> {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. + +%%================================================================================================= +%% Internal fns +%%================================================================================================= + +ensure_loaded(App, Mod) -> + try + _ = application:load(App), + _ = Mod:module_info(), + ok + catch + _:_ -> + ok + end. diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 07208545f..5bfcbe009 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -37,7 +37,7 @@ -export([roots/0, fields/1, desc/1]). --export([mongo_query/5, check_worker_health/1]). +-export([mongo_query/5, mongo_insert/3, check_worker_health/1]). -define(HEALTH_CHECK_TIMEOUT, 30000). @@ -177,9 +177,16 @@ on_start( {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)} ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), + Collection = maps:get(collection, Config, <<"mqtt">>), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts) of - ok -> {ok, #{poolname => PoolName, type => Type}}; - {error, Reason} -> {error, Reason} + ok -> + {ok, #{ + poolname => PoolName, + type => Type, + collection => Collection + }}; + {error, Reason} -> + {error, Reason} end. on_stop(InstId, #{poolname := PoolName}) -> @@ -189,6 +196,35 @@ on_stop(InstId, #{poolname := PoolName}) -> }), emqx_plugin_libs_pool:stop_pool(PoolName). +on_query( + InstId, + {send_message, Document}, + #{poolname := PoolName, collection := Collection} = State +) -> + Request = {insert, Collection, Document}, + ?TRACE( + "QUERY", + "mongodb_connector_received", + #{request => Request, connector => InstId, state => State} + ), + case + ecpool:pick_and_do( + PoolName, + {?MODULE, mongo_insert, [Collection, Document]}, + no_handover + ) + of + {{false, Reason}, _Document} -> + ?SLOG(error, #{ + msg => "mongodb_connector_do_query_failed", + request => Request, + reason => Reason, + connector => InstId + }), + {error, Reason}; + {{true, _Info}, _Document} -> + ok + end; on_query( InstId, {Action, Collection, Filter, Projector}, @@ -292,6 +328,9 @@ mongo_query(Conn, find_one, Collection, Filter, Projector) -> mongo_query(_Conn, _Action, _Collection, _Filter, _Projector) -> ok. +mongo_insert(Conn, Collection, Documents) -> + mongo_api:insert(Conn, Collection, Documents). + init_type(#{mongo_type := rs, replica_set_name := ReplicaSetName}) -> {rs, ReplicaSetName}; init_type(#{mongo_type := Type}) -> diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf new file mode 100644 index 000000000..fef3663ef --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf @@ -0,0 +1,67 @@ +emqx_ee_bridge_mongodb { + desc_config { + desc { + en: "Configuration for MongoDB Bridge" + zh: "为MongoDB桥配置" + } + label { + en: "MongoDB Bridge Configuration" + zh: "MongoDB桥配置" + } + } + + enable { + desc { + en: "Enable or disable this MongoDB Bridge" + zh: "启用或停用该MongoDB桥" + } + label { + en: "Enable or disable" + zh: "启用或禁用" + } + } + + collection { + desc { + en: "The collection where data will be stored into" + zh: "数据将被存储到的集合" + } + label { + en: "Collection to be used" + zh: "将要使用的藏品" + } + } + + mongodb_rs_conf { + desc { + en: "MongoDB (Replica Set) configuration" + zh: "MongoDB(Replica Set)配置" + } + label { + en: "MongoDB (Replica Set) Configuration" + zh: "MongoDB(Replica Set)配置" + } + } + + mongodb_sharded_conf { + desc { + en: "MongoDB (Sharded) configuration" + zh: "MongoDB (Sharded)配置" + } + label { + en: "MongoDB (Sharded) Configuration" + zh: "MongoDB (Sharded)配置" + } + } + + mongodb_single_conf { + desc { + en: "MongoDB (Standalone) configuration" + zh: "MongoDB(独立)配置" + } + label { + en: "MongoDB (Standalone) Configuration" + zh: "MongoDB(独立)配置" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index 5dd22ccef..e986d7983 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,5 +1,9 @@ {erl_opts, [debug_info]}. -{deps, []}. +{deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} + , {emqx_connector, {path, "../../apps/emqx_connector"}} + , {emqx_resource, {path, "../../apps/emqx_resource"}} + , {emqx_bridge, {path, "../../apps/emqx_bridge"}} + ]}. {shell, [ {apps, [emqx_ee_bridge]} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index ad1dfaee8..840b963cd 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -15,6 +15,9 @@ api_schemas(Method) -> [ ref(emqx_ee_bridge_mysql, Method), + ref(emqx_ee_bridge_mongodb, Method ++ "_rs"), + ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"), + ref(emqx_ee_bridge_mongodb, Method ++ "_single"), ref(emqx_ee_bridge_hstreamdb, Method), ref(emqx_ee_bridge_influxdb, Method ++ "_udp"), ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"), @@ -25,6 +28,7 @@ schema_modules() -> [ emqx_ee_bridge_hstreamdb, emqx_ee_bridge_influxdb, + emqx_ee_bridge_mongodb, emqx_ee_bridge_mysql ]. @@ -42,6 +46,9 @@ examples(Method) -> resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; +resource_type(mongodb_rs) -> emqx_connector_mongo; +resource_type(mongodb_sharded) -> emqx_connector_mongo; +resource_type(mongodb_single) -> emqx_connector_mongo; resource_type(mysql) -> emqx_connector_mysql; resource_type(influxdb_udp) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; @@ -59,7 +66,16 @@ fields(bridges) -> hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")), #{desc => <<"EMQX Enterprise Config">>} )} - ] ++ fields(influxdb); + ] ++ fields(mongodb) ++ fields(influxdb); +fields(mongodb) -> + [ + {Type, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_mongodb, Type)), + #{desc => <<"EMQX Enterprise Config">>} + )} + || Type <- [mongodb_rs, mongodb_sharded, mongodb_single] + ]; fields(influxdb) -> [ {Protocol, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl new file mode 100644 index 000000000..9d9a5e4d0 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl @@ -0,0 +1,154 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_mongodb). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-behaviour(hocon_schema). + +%% emqx_ee_bridge "callbacks" +-export([ + conn_bridge_examples/1 +]). + +%% hocon_schema callbacks +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%%================================================================================================= +%% hocon_schema API +%%================================================================================================= + +namespace() -> + "bridge_mongodb". + +roots() -> + []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})}, + {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})} + ]; +fields(mongodb_rs) -> + emqx_connector_mongo:fields(rs) ++ fields("config"); +fields(mongodb_sharded) -> + emqx_connector_mongo:fields(sharded) ++ fields("config"); +fields(mongodb_single) -> + emqx_connector_mongo:fields(single) ++ fields("config"); +fields("post_rs") -> + fields(mongodb_rs); +fields("post_sharded") -> + fields(mongodb_sharded); +fields("post_single") -> + fields(mongodb_single); +fields("put_rs") -> + fields(mongodb_rs); +fields("put_sharded") -> + fields(mongodb_sharded); +fields("put_single") -> + fields(mongodb_single); +fields("get_rs") -> + emqx_bridge_schema:metrics_status_fields() ++ fields(mongodb_rs); +fields("get_sharded") -> + emqx_bridge_schema:metrics_status_fields() ++ fields(mongodb_sharded); +fields("get_single") -> + emqx_bridge_schema:metrics_status_fields() ++ fields(mongodb_single). + +conn_bridge_examples(Method) -> + [ + #{ + <<"mongodb_rs">> => #{ + summary => <<"MongoDB (Replica Set) Bridge">>, + value => values(mongodb_rs, Method) + } + }, + #{ + <<"mongodb_sharded">> => #{ + summary => <<"MongoDB (Sharded) Bridge">>, + value => values(mongodb_sharded, Method) + } + }, + #{ + <<"mongodb_single">> => #{ + summary => <<"MongoDB (Standalone) Bridge">>, + value => values(mongodb_single, Method) + } + } + ]. + +desc("config") -> + ?DESC("desc_config"); +desc(mongodb_rs) -> + ?DESC(mongodb_rs_conf); +desc(mongodb_sharded) -> + ?DESC(mongodb_sharded_conf); +desc(mongodb_single) -> + ?DESC(mongodb_single_conf); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for MongoDB using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +%%================================================================================================= +%% Internal fns +%%================================================================================================= + +values(mongodb_rs = MongoType, Method) -> + TypeOpts = #{ + servers => <<"localhost:27017, localhost:27018">>, + w_mode => <<"safe">>, + r_mode => <<"safe">>, + replica_set_name => <<"rs">> + }, + values(common, MongoType, Method, TypeOpts); +values(mongodb_sharded = MongoType, Method) -> + TypeOpts = #{ + servers => <<"localhost:27017, localhost:27018">>, + w_mode => <<"safe">> + }, + values(common, MongoType, Method, TypeOpts); +values(mongodb_single = MongoType, Method) -> + TypeOpts = #{ + server => <<"localhost:27017">>, + w_mode => <<"safe">> + }, + values(common, MongoType, Method, TypeOpts). + +values(common, MongoType, Method, TypeOpts) -> + MongoTypeBin = atom_to_binary(MongoType), + Common = #{ + name => <>, + type => MongoTypeBin, + enable => true, + collection => <<"mycol">>, + database => <<"mqtt">>, + srv_record => false, + pool_size => 8, + username => <<"myuser">>, + password => <<"mypass">> + }, + MethodVals = method_values(MongoType, Method), + Vals0 = maps:merge(MethodVals, Common), + maps:merge(Vals0, TypeOpts). + +method_values(MongoType, get) -> + Vals = method_values(MongoType, post), + maps:merge(?METRICS_EXAMPLE, Vals); +method_values(MongoType, _) -> + ConnectorType = + case MongoType of + mongodb_rs -> <<"rs">>; + mongodb_sharded -> <<"sharded">>; + mongodb_single -> <<"single">> + end, + #{mongo_type => ConnectorType}. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl new file mode 100644 index 000000000..775acf7b0 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl @@ -0,0 +1,251 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_bridge_mongodb_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, rs}, + {group, sharded}, + {group, single} + | (emqx_common_test_helpers:all(?MODULE) -- group_tests()) + ]. + +group_tests() -> + [ + t_setup_via_config_and_publish, + t_setup_via_http_api_and_publish + ]. + +groups() -> + [ + {rs, group_tests()}, + {sharded, group_tests()}, + {single, group_tests()} + ]. + +init_per_group(Type = rs, Config) -> + MongoHost = os:getenv("MONGO_RS_HOST", "mongo1"), + MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")), + case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of + true -> + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + emqx_mgmt_api_test_util:init_suite(), + MongoConfig = mongo_config(MongoHost, MongoPort, Type), + [ + {mongo_host, MongoHost}, + {mongo_port, MongoPort}, + {mongo_config, MongoConfig} + | Config + ]; + false -> + {skip, no_mongo} + end; +init_per_group(Type = sharded, Config) -> + MongoHost = os:getenv("MONGO_SHARDED_HOST", "mongosharded3"), + MongoPort = list_to_integer(os:getenv("MONGO_SHARDED_PORT", "27017")), + case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of + true -> + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + emqx_mgmt_api_test_util:init_suite(), + MongoConfig = mongo_config(MongoHost, MongoPort, Type), + [ + {mongo_host, MongoHost}, + {mongo_port, MongoPort}, + {mongo_config, MongoConfig} + | Config + ]; + false -> + {skip, no_mongo} + end; +init_per_group(Type = single, Config) -> + MongoHost = os:getenv("MONGO_SINGLE_HOST", "mongo"), + MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")), + case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of + true -> + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + emqx_mgmt_api_test_util:init_suite(), + MongoConfig = mongo_config(MongoHost, MongoPort, Type), + [ + {mongo_host, MongoHost}, + {mongo_port, MongoPort}, + {mongo_config, MongoConfig} + | Config + ]; + false -> + {skip, no_mongo} + end. + +end_per_group(_Type, _Config) -> + ok. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), + ok. + +init_per_testcase(_Testcase, Config) -> + catch clear_db(Config), + delete_bridge(Config), + Config. + +end_per_testcase(_Testcase, Config) -> + catch clear_db(Config), + delete_bridge(Config), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +mongo_config(MongoHost0, MongoPort0, rs) -> + MongoHost = list_to_binary(MongoHost0), + MongoPort = integer_to_binary(MongoPort0), + Servers = <>, + Name = atom_to_binary(?MODULE), + #{ + <<"type">> => <<"mongodb_rs">>, + <<"name">> => Name, + <<"enable">> => true, + <<"collection">> => <<"mycol">>, + <<"servers">> => Servers, + <<"database">> => <<"mqtt">>, + <<"w_mode">> => <<"safe">>, + <<"replica_set_name">> => <<"rs0">> + }; +mongo_config(MongoHost0, MongoPort0, sharded) -> + MongoHost = list_to_binary(MongoHost0), + MongoPort = integer_to_binary(MongoPort0), + Servers = <>, + Name = atom_to_binary(?MODULE), + #{ + <<"type">> => <<"mongodb_sharded">>, + <<"name">> => Name, + <<"enable">> => true, + <<"collection">> => <<"mycol">>, + <<"servers">> => Servers, + <<"database">> => <<"mqtt">>, + <<"w_mode">> => <<"safe">> + }; +mongo_config(MongoHost0, MongoPort0, single) -> + MongoHost = list_to_binary(MongoHost0), + MongoPort = integer_to_binary(MongoPort0), + Server = <>, + Name = atom_to_binary(?MODULE), + #{ + <<"type">> => <<"mongodb_single">>, + <<"name">> => Name, + <<"enable">> => true, + <<"collection">> => <<"mycol">>, + <<"server">> => Server, + <<"database">> => <<"mqtt">>, + <<"w_mode">> => <<"safe">> + }. + +create_bridge(Config0 = #{<<"type">> := Type, <<"name">> := Name}) -> + Config = maps:without( + [ + <<"type">>, + <<"name">> + ], + Config0 + ), + emqx_bridge:create(Type, Name, Config). + +delete_bridge(Config) -> + #{ + <<"type">> := Type, + <<"name">> := Name + } = ?config(mongo_config, Config), + emqx_bridge:remove(Type, Name). + +create_bridge_http(Params) -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + +clear_db(Config) -> + #{ + <<"name">> := Name, + <<"type">> := Type, + <<"collection">> := Collection + } = ?config(mongo_config, Config), + ResourceID = emqx_bridge_resource:resource_id(Type, Name), + {ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID), + Selector = #{}, + {true, _} = ecpool:pick_and_do( + PoolName, {mongo_api, delete, [Collection, Selector]}, no_handover + ), + ok. + +find_all(Config) -> + #{ + <<"name">> := Name, + <<"type">> := Type, + <<"collection">> := Collection + } = ?config(mongo_config, Config), + ResourceID = emqx_bridge_resource:resource_id(Type, Name), + emqx_resource:query(ResourceID, {find, Collection, #{}, #{}}). + +send_message(Config, Payload) -> + #{ + <<"name">> := Name, + <<"type">> := Type + } = ?config(mongo_config, Config), + BridgeID = emqx_bridge_resource:bridge_id(Type, Name), + emqx_bridge:send_message(BridgeID, Payload). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_setup_via_config_and_publish(Config) -> + MongoConfig = ?config(mongo_config, Config), + ?assertMatch( + {ok, _}, + create_bridge(MongoConfig) + ), + Val = erlang:unique_integer(), + ok = send_message(Config, #{key => Val}), + ?assertMatch( + {ok, [#{<<"key">> := Val}]}, + find_all(Config) + ), + ok. + +t_setup_via_http_api_and_publish(Config) -> + MongoConfig = ?config(mongo_config, Config), + ?assertMatch( + {ok, _}, + create_bridge_http(MongoConfig) + ), + Val = erlang:unique_integer(), + ok = send_message(Config, #{key => Val}), + ?assertMatch( + {ok, [#{<<"key">> := Val}]}, + find_all(Config) + ), + ok. diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index 5963b7ab0..1419c2070 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -1,7 +1,8 @@ {erl_opts, [debug_info]}. {deps, [ {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, - {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.4"}}} + {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.4"}}}, + {emqx, {path, "../../apps/emqx"}} ]}. {shell, [ diff --git a/scripts/docker-ct-apps b/scripts/docker-ct-apps index 882a68285..e00f8dd1f 100644 --- a/scripts/docker-ct-apps +++ b/scripts/docker-ct-apps @@ -2,3 +2,4 @@ apps/emqx_authn apps/emqx_authz apps/emqx_connector +lib-ee/emqx_ee_bridge From 275171d2174780808eef8efe967d81065d640281 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 1 Sep 2022 14:37:29 -0300 Subject: [PATCH 2/3] feat: handle lists of servers in mongo servers config --- .../src/emqx_connector_mongo.erl | 32 +++- .../test/emqx_connector_mongo_tests.erl | 168 ++++++++++++++++++ 2 files changed, 198 insertions(+), 2 deletions(-) create mode 100644 apps/emqx_connector/test/emqx_connector_mongo_tests.erl diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 5bfcbe009..778abf8c2 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -47,6 +47,10 @@ default_port => ?MONGO_DEFAULT_PORT }). +-ifdef(TEST). +-export([to_servers_raw/1]). +-endif. + %%===================================================================== roots() -> [ @@ -447,7 +451,7 @@ may_parse_srv_and_txt_records_( true -> error({missing_parameter, replica_set_name}); false -> - Config#{hosts => servers_to_bin(Servers)} + Config#{hosts => servers_to_bin(lists:flatten(Servers))} end; may_parse_srv_and_txt_records_( #{ @@ -557,9 +561,33 @@ to_servers_raw(Servers) -> fun(Server) -> emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS) end, - string:tokens(str(Servers), ", ") + split_servers(Servers) ). +split_servers(L) when is_list(L) -> + PossibleTypes = [ + list(binary()), + list(string()), + string() + ], + TypeChecks = lists:map(fun(T) -> typerefl:typecheck(T, L) end, PossibleTypes), + case TypeChecks of + [ok, _, _] -> + %% list(binary()) + lists:map(fun binary_to_list/1, L); + [_, ok, _] -> + %% list(string()) + L; + [_, _, ok] -> + %% string() + string:tokens(L, ", "); + [_, _, _] -> + %% invalid input + throw("List of servers must contain only strings") + end; +split_servers(B) when is_binary(B) -> + string:tokens(str(B), ", "). + str(A) when is_atom(A) -> atom_to_list(A); str(B) when is_binary(B) -> diff --git a/apps/emqx_connector/test/emqx_connector_mongo_tests.erl b/apps/emqx_connector/test/emqx_connector_mongo_tests.erl new file mode 100644 index 000000000..7978ed289 --- /dev/null +++ b/apps/emqx_connector/test/emqx_connector_mongo_tests.erl @@ -0,0 +1,168 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_mongo_tests). + +-include_lib("eunit/include/eunit.hrl"). + +-define(DEFAULT_MONGO_PORT, 27017). + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------ +%% Test cases +%%------------------------------------------------------------------------------ + +to_servers_raw_test_() -> + [ + {"single server, binary, no port", + ?_test( + ?assertEqual( + [{"localhost", ?DEFAULT_MONGO_PORT}], + emqx_connector_mongo:to_servers_raw(<<"localhost">>) + ) + )}, + {"single server, string, no port", + ?_test( + ?assertEqual( + [{"localhost", ?DEFAULT_MONGO_PORT}], + emqx_connector_mongo:to_servers_raw("localhost") + ) + )}, + {"single server, list(binary), no port", + ?_test( + ?assertEqual( + [{"localhost", ?DEFAULT_MONGO_PORT}], + emqx_connector_mongo:to_servers_raw([<<"localhost">>]) + ) + )}, + {"single server, list(string), no port", + ?_test( + ?assertEqual( + [{"localhost", ?DEFAULT_MONGO_PORT}], + emqx_connector_mongo:to_servers_raw(["localhost"]) + ) + )}, + %%%%%%%%% + {"single server, binary, with port", + ?_test( + ?assertEqual( + [{"localhost", 9999}], emqx_connector_mongo:to_servers_raw(<<"localhost:9999">>) + ) + )}, + {"single server, string, with port", + ?_test( + ?assertEqual( + [{"localhost", 9999}], emqx_connector_mongo:to_servers_raw("localhost:9999") + ) + )}, + {"single server, list(binary), with port", + ?_test( + ?assertEqual( + [{"localhost", 9999}], + emqx_connector_mongo:to_servers_raw([<<"localhost:9999">>]) + ) + )}, + {"single server, list(string), with port", + ?_test( + ?assertEqual( + [{"localhost", 9999}], emqx_connector_mongo:to_servers_raw(["localhost:9999"]) + ) + )}, + %%%%%%%%% + {"multiple servers, string, no port", + ?_test( + ?assertEqual( + [{"host1", ?DEFAULT_MONGO_PORT}, {"host2", ?DEFAULT_MONGO_PORT}], + emqx_connector_mongo:to_servers_raw("host1, host2") + ) + )}, + {"multiple servers, binary, no port", + ?_test( + ?assertEqual( + [{"host1", ?DEFAULT_MONGO_PORT}, {"host2", ?DEFAULT_MONGO_PORT}], + emqx_connector_mongo:to_servers_raw(<<"host1, host2">>) + ) + )}, + {"multiple servers, list(string), no port", + ?_test( + ?assertEqual( + [{"host1", ?DEFAULT_MONGO_PORT}, {"host2", ?DEFAULT_MONGO_PORT}], + emqx_connector_mongo:to_servers_raw(["host1", "host2"]) + ) + )}, + {"multiple servers, list(binary), no port", + ?_test( + ?assertEqual( + [{"host1", ?DEFAULT_MONGO_PORT}, {"host2", ?DEFAULT_MONGO_PORT}], + emqx_connector_mongo:to_servers_raw([<<"host1">>, <<"host2">>]) + ) + )}, + %%%%%%%%% + {"multiple servers, string, with port", + ?_test( + ?assertEqual( + [{"host1", 1234}, {"host2", 2345}], + emqx_connector_mongo:to_servers_raw("host1:1234, host2:2345") + ) + )}, + {"multiple servers, binary, with port", + ?_test( + ?assertEqual( + [{"host1", 1234}, {"host2", 2345}], + emqx_connector_mongo:to_servers_raw(<<"host1:1234, host2:2345">>) + ) + )}, + {"multiple servers, list(string), with port", + ?_test( + ?assertEqual( + [{"host1", 1234}, {"host2", 2345}], + emqx_connector_mongo:to_servers_raw(["host1:1234", "host2:2345"]) + ) + )}, + {"multiple servers, list(binary), with port", + ?_test( + ?assertEqual( + [{"host1", 1234}, {"host2", 2345}], + emqx_connector_mongo:to_servers_raw([<<"host1:1234">>, <<"host2:2345">>]) + ) + )}, + %%%%%%%% + {"multiple servers, invalid list(string)", + ?_test( + ?assertThrow( + _, + emqx_connector_mongo:to_servers_raw(["host1, host2"]) + ) + )}, + {"multiple servers, invalid list(binary)", + ?_test( + ?assertThrow( + _, + emqx_connector_mongo:to_servers_raw([<<"host1, host2">>]) + ) + )}, + %% TODO: handle this case?? + {"multiple servers, mixed list(binary|string)", + ?_test( + ?assertThrow( + _, + emqx_connector_mongo:to_servers_raw([<<"host1">>, "host2"]) + ) + )} + ]. From f1048babd882b1b037fe6c3b417d0dc08bb703a3 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 2 Sep 2022 11:05:24 -0300 Subject: [PATCH 3/3] test: refactor to use hocon and schema --- .../test/emqx_ee_bridge_mongodb_SUITE.erl | 191 ++++++++++-------- 1 file changed, 106 insertions(+), 85 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl index 775acf7b0..bc8c0b04f 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl @@ -40,15 +40,16 @@ init_per_group(Type = rs, Config) -> MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")), case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of true -> - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), + ensure_loaded(), ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), emqx_mgmt_api_test_util:init_suite(), - MongoConfig = mongo_config(MongoHost, MongoPort, Type), + {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type), [ {mongo_host, MongoHost}, {mongo_port, MongoPort}, - {mongo_config, MongoConfig} + {mongo_config, MongoConfig}, + {mongo_type, Type}, + {mongo_name, Name} | Config ]; false -> @@ -59,15 +60,16 @@ init_per_group(Type = sharded, Config) -> MongoPort = list_to_integer(os:getenv("MONGO_SHARDED_PORT", "27017")), case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of true -> - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), + ensure_loaded(), ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), emqx_mgmt_api_test_util:init_suite(), - MongoConfig = mongo_config(MongoHost, MongoPort, Type), + {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type), [ {mongo_host, MongoHost}, {mongo_port, MongoPort}, - {mongo_config, MongoConfig} + {mongo_config, MongoConfig}, + {mongo_type, Type}, + {mongo_name, Name} | Config ]; false -> @@ -78,15 +80,16 @@ init_per_group(Type = single, Config) -> MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")), case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of true -> - _ = application:load(emqx_ee_bridge), - _ = emqx_ee_bridge:module_info(), + ensure_loaded(), ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), emqx_mgmt_api_test_util:init_suite(), - MongoConfig = mongo_config(MongoHost, MongoPort, Type), + {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type), [ {mongo_host, MongoHost}, {mongo_port, MongoPort}, - {mongo_config, MongoConfig} + {mongo_config, MongoConfig}, + {mongo_type, Type}, + {mongo_name, Name} | Config ]; false -> @@ -118,65 +121,84 @@ end_per_testcase(_Testcase, Config) -> %% Helper fns %%------------------------------------------------------------------------------ -mongo_config(MongoHost0, MongoPort0, rs) -> - MongoHost = list_to_binary(MongoHost0), - MongoPort = integer_to_binary(MongoPort0), - Servers = <>, - Name = atom_to_binary(?MODULE), - #{ - <<"type">> => <<"mongodb_rs">>, - <<"name">> => Name, - <<"enable">> => true, - <<"collection">> => <<"mycol">>, - <<"servers">> => Servers, - <<"database">> => <<"mqtt">>, - <<"w_mode">> => <<"safe">>, - <<"replica_set_name">> => <<"rs0">> - }; -mongo_config(MongoHost0, MongoPort0, sharded) -> - MongoHost = list_to_binary(MongoHost0), - MongoPort = integer_to_binary(MongoPort0), - Servers = <>, - Name = atom_to_binary(?MODULE), - #{ - <<"type">> => <<"mongodb_sharded">>, - <<"name">> => Name, - <<"enable">> => true, - <<"collection">> => <<"mycol">>, - <<"servers">> => Servers, - <<"database">> => <<"mqtt">>, - <<"w_mode">> => <<"safe">> - }; -mongo_config(MongoHost0, MongoPort0, single) -> - MongoHost = list_to_binary(MongoHost0), - MongoPort = integer_to_binary(MongoPort0), - Server = <>, - Name = atom_to_binary(?MODULE), - #{ - <<"type">> => <<"mongodb_single">>, - <<"name">> => Name, - <<"enable">> => true, - <<"collection">> => <<"mycol">>, - <<"server">> => Server, - <<"database">> => <<"mqtt">>, - <<"w_mode">> => <<"safe">> - }. +ensure_loaded() -> + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok. -create_bridge(Config0 = #{<<"type">> := Type, <<"name">> := Name}) -> - Config = maps:without( - [ - <<"type">>, - <<"name">> - ], - Config0 - ), - emqx_bridge:create(Type, Name, Config). +mongo_type_bin(rs) -> + <<"mongodb_rs">>; +mongo_type_bin(sharded) -> + <<"mongodb_sharded">>; +mongo_type_bin(single) -> + <<"mongodb_single">>. + +mongo_config(MongoHost, MongoPort0, rs = Type) -> + MongoPort = integer_to_list(MongoPort0), + Servers = MongoHost ++ ":" ++ MongoPort, + Name = atom_to_binary(?MODULE), + ConfigString = + io_lib:format( + "bridges.mongodb_rs.~s {\n" + " enable = true\n" + " collection = mycol\n" + " replica_set_name = rs0\n" + " servers = [~p]\n" + " w_mode = safe\n" + " database = mqtt\n" + "}", + [Name, Servers] + ), + {Name, parse_and_check(ConfigString, Type, Name)}; +mongo_config(MongoHost, MongoPort0, sharded = Type) -> + MongoPort = integer_to_list(MongoPort0), + Servers = MongoHost ++ ":" ++ MongoPort, + Name = atom_to_binary(?MODULE), + ConfigString = + io_lib:format( + "bridges.mongodb_sharded.~s {\n" + " enable = true\n" + " collection = mycol\n" + " servers = [~p]\n" + " w_mode = safe\n" + " database = mqtt\n" + "}", + [Name, Servers] + ), + {Name, parse_and_check(ConfigString, Type, Name)}; +mongo_config(MongoHost, MongoPort0, single = Type) -> + MongoPort = integer_to_list(MongoPort0), + Server = MongoHost ++ ":" ++ MongoPort, + Name = atom_to_binary(?MODULE), + ConfigString = + io_lib:format( + "bridges.mongodb_single.~s {\n" + " enable = true\n" + " collection = mycol\n" + " server = ~p\n" + " w_mode = safe\n" + " database = mqtt\n" + "}", + [Name, Server] + ), + {Name, parse_and_check(ConfigString, Type, Name)}. + +parse_and_check(ConfigString, Type, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + TypeBin = mongo_type_bin(Type), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf, + Config. + +create_bridge(Config) -> + Type = mongo_type_bin(?config(mongo_type, Config)), + Name = ?config(mongo_name, Config), + MongoConfig = ?config(mongo_config, Config), + emqx_bridge:create(Type, Name, MongoConfig). delete_bridge(Config) -> - #{ - <<"type">> := Type, - <<"name">> := Name - } = ?config(mongo_config, Config), + Type = mongo_type_bin(?config(mongo_type, Config)), + Name = ?config(mongo_name, Config), emqx_bridge:remove(Type, Name). create_bridge_http(Params) -> @@ -188,11 +210,9 @@ create_bridge_http(Params) -> end. clear_db(Config) -> - #{ - <<"name">> := Name, - <<"type">> := Type, - <<"collection">> := Collection - } = ?config(mongo_config, Config), + Type = mongo_type_bin(?config(mongo_type, Config)), + Name = ?config(mongo_name, Config), + #{<<"collection">> := Collection} = ?config(mongo_config, Config), ResourceID = emqx_bridge_resource:resource_id(Type, Name), {ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID), Selector = #{}, @@ -202,19 +222,15 @@ clear_db(Config) -> ok. find_all(Config) -> - #{ - <<"name">> := Name, - <<"type">> := Type, - <<"collection">> := Collection - } = ?config(mongo_config, Config), + Type = mongo_type_bin(?config(mongo_type, Config)), + Name = ?config(mongo_name, Config), + #{<<"collection">> := Collection} = ?config(mongo_config, Config), ResourceID = emqx_bridge_resource:resource_id(Type, Name), emqx_resource:query(ResourceID, {find, Collection, #{}, #{}}). send_message(Config, Payload) -> - #{ - <<"name">> := Name, - <<"type">> := Type - } = ?config(mongo_config, Config), + Name = ?config(mongo_name, Config), + Type = mongo_type_bin(?config(mongo_type, Config)), BridgeID = emqx_bridge_resource:bridge_id(Type, Name), emqx_bridge:send_message(BridgeID, Payload). @@ -223,10 +239,9 @@ send_message(Config, Payload) -> %%------------------------------------------------------------------------------ t_setup_via_config_and_publish(Config) -> - MongoConfig = ?config(mongo_config, Config), ?assertMatch( {ok, _}, - create_bridge(MongoConfig) + create_bridge(Config) ), Val = erlang:unique_integer(), ok = send_message(Config, #{key => Val}), @@ -237,7 +252,13 @@ t_setup_via_config_and_publish(Config) -> ok. t_setup_via_http_api_and_publish(Config) -> - MongoConfig = ?config(mongo_config, Config), + Type = mongo_type_bin(?config(mongo_type, Config)), + Name = ?config(mongo_name, Config), + MongoConfig0 = ?config(mongo_config, Config), + MongoConfig = MongoConfig0#{ + <<"name">> => Name, + <<"type">> => Type + }, ?assertMatch( {ok, _}, create_bridge_http(MongoConfig)