diff --git a/.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml b/.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml index f83fe0932..54506abd8 100644 --- a/.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml +++ b/.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml @@ -18,7 +18,7 @@ services: --ipv6 --bind_ip_all --replSet rs0 - + mongo2: hostname: mongo2 container_name: mongo2 @@ -54,10 +54,10 @@ services: --ipv6 --bind_ip_all --replSet rs0 - - mongo_client: + + mongo_rs_client: image: mongo:${MONGO_TAG} - container_name: mongo_client + container_name: mongo_rs_client networks: - emqx_bridge depends_on: diff --git a/.ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml b/.ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml new file mode 100644 index 000000000..a8b51689b --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml @@ -0,0 +1,90 @@ +version: "3" + +services: + mongosharded1: + hostname: mongosharded1 + container_name: mongosharded1 + image: mongo:${MONGO_TAG} + environment: + MONGO_INITDB_DATABASE: mqtt + networks: + - emqx_bridge + expose: + - 27017 + ports: + - 27014:27017 + restart: always + command: + --configsvr + --replSet cfg0 + --port 27017 + --ipv6 + --bind_ip_all + + mongosharded2: + hostname: mongosharded2 + container_name: mongosharded2 + image: mongo:${MONGO_TAG} + environment: + MONGO_INITDB_DATABASE: mqtt + networks: + - emqx_bridge + expose: + - 27017 + ports: + - 27015:27017 + restart: always + command: + --shardsvr + --replSet rs0 + --port 27017 + --ipv6 + --bind_ip_all + + mongosharded3: + hostname: mongosharded3 + container_name: mongosharded3 + image: mongo:${MONGO_TAG} + environment: + MONGO_INITDB_DATABASE: mqtt + networks: + - emqx_bridge + expose: + - 27017 + ports: + - 27016:27017 + restart: always + entrypoint: mongos + command: + --configdb cfg0/mongosharded1:27017 + --port 27017 + --ipv6 + --bind_ip_all + + mongosharded_client: + image: mongo:${MONGO_TAG} + container_name: mongosharded_client + networks: + - emqx_bridge + depends_on: + - mongosharded1 + - mongosharded2 + - mongosharded3 + command: + - /bin/bash + - -c + - | + while ! mongo --host mongosharded1 --eval 'db.runCommand("ping").ok' --quiet >/dev/null 2>&1 ; do + sleep 1 + done + mongo --host mongosharded1 --eval "rs.initiate( { _id : 'cfg0', configsvr: true, members: [ { _id : 0, host : 'mongosharded1:27017' } ] })" + while ! mongo --host mongosharded2 --eval 'db.runCommand("ping").ok' --quiet >/dev/null 2>&1 ; do + sleep 1 + done + mongo --host mongosharded2 --eval "rs.initiate( { _id : 'rs0', members: [ { _id : 0, host : 'mongosharded2:27017' } ] })" + mongo --host mongosharded2 --eval "rs.status()" + while ! mongo --host mongosharded3 --eval 'db.runCommand("ping").ok' --quiet >/dev/null 2>&1 ; do + sleep 1 + done + mongo --host mongosharded3 --eval "sh.addShard('rs0/mongosharded2:27017')" + mongo --host mongosharded3 --eval "sh.enableSharding('mqtt')" diff --git a/.github/workflows/elixir_release.yml b/.github/workflows/elixir_release.yml index 006d6aba8..b703f2fde 100644 --- a/.github/workflows/elixir_release.yml +++ b/.github/workflows/elixir_release.yml @@ -12,7 +12,18 @@ on: jobs: elixir_release_build: runs-on: ubuntu-latest - container: ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-ubuntu20.04 + strategy: + matrix: + otp: + - 24.2.1-1 + elixir: + - 1.13.4 + os: + - ubuntu20.04 + profile: + - emqx + - emqx-enterprise + container: ghcr.io/emqx/emqx-builder/5.0-17:${{ matrix.elixir }}-${{ matrix.otp }}-${{ matrix.os }} steps: - name: Checkout @@ -23,15 +34,15 @@ jobs: run: | git config --global --add safe.directory "$GITHUB_WORKSPACE" - name: elixir release - run: make emqx-elixir + run: make ${{ matrix.profile }}-elixir - name: start release run: | - cd _build/emqx/rel/emqx + cd _build/${{ matrix.profile }}/rel/emqx bin/emqx start - name: check if started run: | sleep 10 nc -zv localhost 1883 - cd _build/emqx/rel/emqx + cd _build/${{ matrix.profile }}/rel/emqx bin/emqx ping bin/emqx ctl status diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index b4b797621..5d0dd1ca2 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -124,6 +124,8 @@ jobs: docker-compose \ -f .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-single-tls.yaml \ + -f .ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mysql-tls.yaml \ -f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \ @@ -135,6 +137,11 @@ jobs: -f .ci/docker-compose-file/docker-compose.yaml \ up -d --build + - name: wait some services to be fully up + run: | + docker wait mongosharded_client + docker wait mongo_rs_client + # produces .coverdata - name: run common test working-directory: source diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index f55ac840e..af74ae2ec 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -41,12 +41,16 @@ api_schema(Method) -> hoconsc:union(Broker ++ EE). ee_api_schemas(Method) -> + %% must ensure the app is loaded before checking if fn is defined. + ensure_loaded(emqx_ee_bridge, emqx_ee_bridge), case erlang:function_exported(emqx_ee_bridge, api_schemas, 1) of true -> emqx_ee_bridge:api_schemas(Method); false -> [] end. ee_fields_bridges() -> + %% must ensure the app is loaded before checking if fn is defined. + ensure_loaded(emqx_ee_bridge, emqx_ee_bridge), case erlang:function_exported(emqx_ee_bridge, fields, 1) of true -> emqx_ee_bridge:fields(bridges); false -> [] @@ -155,3 +159,17 @@ status() -> node_name() -> {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. + +%%================================================================================================= +%% Internal fns +%%================================================================================================= + +ensure_loaded(App, Mod) -> + try + _ = application:load(App), + _ = Mod:module_info(), + ok + catch + _:_ -> + ok + end. diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 07208545f..5bfcbe009 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -37,7 +37,7 @@ -export([roots/0, fields/1, desc/1]). --export([mongo_query/5, check_worker_health/1]). +-export([mongo_query/5, mongo_insert/3, check_worker_health/1]). -define(HEALTH_CHECK_TIMEOUT, 30000). @@ -177,9 +177,16 @@ on_start( {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)} ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), + Collection = maps:get(collection, Config, <<"mqtt">>), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts) of - ok -> {ok, #{poolname => PoolName, type => Type}}; - {error, Reason} -> {error, Reason} + ok -> + {ok, #{ + poolname => PoolName, + type => Type, + collection => Collection + }}; + {error, Reason} -> + {error, Reason} end. on_stop(InstId, #{poolname := PoolName}) -> @@ -189,6 +196,35 @@ on_stop(InstId, #{poolname := PoolName}) -> }), emqx_plugin_libs_pool:stop_pool(PoolName). +on_query( + InstId, + {send_message, Document}, + #{poolname := PoolName, collection := Collection} = State +) -> + Request = {insert, Collection, Document}, + ?TRACE( + "QUERY", + "mongodb_connector_received", + #{request => Request, connector => InstId, state => State} + ), + case + ecpool:pick_and_do( + PoolName, + {?MODULE, mongo_insert, [Collection, Document]}, + no_handover + ) + of + {{false, Reason}, _Document} -> + ?SLOG(error, #{ + msg => "mongodb_connector_do_query_failed", + request => Request, + reason => Reason, + connector => InstId + }), + {error, Reason}; + {{true, _Info}, _Document} -> + ok + end; on_query( InstId, {Action, Collection, Filter, Projector}, @@ -292,6 +328,9 @@ mongo_query(Conn, find_one, Collection, Filter, Projector) -> mongo_query(_Conn, _Action, _Collection, _Filter, _Projector) -> ok. +mongo_insert(Conn, Collection, Documents) -> + mongo_api:insert(Conn, Collection, Documents). + init_type(#{mongo_type := rs, replica_set_name := ReplicaSetName}) -> {rs, ReplicaSetName}; init_type(#{mongo_type := Type}) -> diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf new file mode 100644 index 000000000..fef3663ef --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf @@ -0,0 +1,67 @@ +emqx_ee_bridge_mongodb { + desc_config { + desc { + en: "Configuration for MongoDB Bridge" + zh: "为MongoDB桥配置" + } + label { + en: "MongoDB Bridge Configuration" + zh: "MongoDB桥配置" + } + } + + enable { + desc { + en: "Enable or disable this MongoDB Bridge" + zh: "启用或停用该MongoDB桥" + } + label { + en: "Enable or disable" + zh: "启用或禁用" + } + } + + collection { + desc { + en: "The collection where data will be stored into" + zh: "数据将被存储到的集合" + } + label { + en: "Collection to be used" + zh: "将要使用的藏品" + } + } + + mongodb_rs_conf { + desc { + en: "MongoDB (Replica Set) configuration" + zh: "MongoDB(Replica Set)配置" + } + label { + en: "MongoDB (Replica Set) Configuration" + zh: "MongoDB(Replica Set)配置" + } + } + + mongodb_sharded_conf { + desc { + en: "MongoDB (Sharded) configuration" + zh: "MongoDB (Sharded)配置" + } + label { + en: "MongoDB (Sharded) Configuration" + zh: "MongoDB (Sharded)配置" + } + } + + mongodb_single_conf { + desc { + en: "MongoDB (Standalone) configuration" + zh: "MongoDB(独立)配置" + } + label { + en: "MongoDB (Standalone) Configuration" + zh: "MongoDB(独立)配置" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index 5dd22ccef..e986d7983 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,5 +1,9 @@ {erl_opts, [debug_info]}. -{deps, []}. +{deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} + , {emqx_connector, {path, "../../apps/emqx_connector"}} + , {emqx_resource, {path, "../../apps/emqx_resource"}} + , {emqx_bridge, {path, "../../apps/emqx_bridge"}} + ]}. {shell, [ {apps, [emqx_ee_bridge]} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index ad1dfaee8..840b963cd 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -15,6 +15,9 @@ api_schemas(Method) -> [ ref(emqx_ee_bridge_mysql, Method), + ref(emqx_ee_bridge_mongodb, Method ++ "_rs"), + ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"), + ref(emqx_ee_bridge_mongodb, Method ++ "_single"), ref(emqx_ee_bridge_hstreamdb, Method), ref(emqx_ee_bridge_influxdb, Method ++ "_udp"), ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"), @@ -25,6 +28,7 @@ schema_modules() -> [ emqx_ee_bridge_hstreamdb, emqx_ee_bridge_influxdb, + emqx_ee_bridge_mongodb, emqx_ee_bridge_mysql ]. @@ -42,6 +46,9 @@ examples(Method) -> resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; +resource_type(mongodb_rs) -> emqx_connector_mongo; +resource_type(mongodb_sharded) -> emqx_connector_mongo; +resource_type(mongodb_single) -> emqx_connector_mongo; resource_type(mysql) -> emqx_connector_mysql; resource_type(influxdb_udp) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; @@ -59,7 +66,16 @@ fields(bridges) -> hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")), #{desc => <<"EMQX Enterprise Config">>} )} - ] ++ fields(influxdb); + ] ++ fields(mongodb) ++ fields(influxdb); +fields(mongodb) -> + [ + {Type, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_mongodb, Type)), + #{desc => <<"EMQX Enterprise Config">>} + )} + || Type <- [mongodb_rs, mongodb_sharded, mongodb_single] + ]; fields(influxdb) -> [ {Protocol, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl new file mode 100644 index 000000000..9d9a5e4d0 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl @@ -0,0 +1,154 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_mongodb). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-behaviour(hocon_schema). + +%% emqx_ee_bridge "callbacks" +-export([ + conn_bridge_examples/1 +]). + +%% hocon_schema callbacks +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%%================================================================================================= +%% hocon_schema API +%%================================================================================================= + +namespace() -> + "bridge_mongodb". + +roots() -> + []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})}, + {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})} + ]; +fields(mongodb_rs) -> + emqx_connector_mongo:fields(rs) ++ fields("config"); +fields(mongodb_sharded) -> + emqx_connector_mongo:fields(sharded) ++ fields("config"); +fields(mongodb_single) -> + emqx_connector_mongo:fields(single) ++ fields("config"); +fields("post_rs") -> + fields(mongodb_rs); +fields("post_sharded") -> + fields(mongodb_sharded); +fields("post_single") -> + fields(mongodb_single); +fields("put_rs") -> + fields(mongodb_rs); +fields("put_sharded") -> + fields(mongodb_sharded); +fields("put_single") -> + fields(mongodb_single); +fields("get_rs") -> + emqx_bridge_schema:metrics_status_fields() ++ fields(mongodb_rs); +fields("get_sharded") -> + emqx_bridge_schema:metrics_status_fields() ++ fields(mongodb_sharded); +fields("get_single") -> + emqx_bridge_schema:metrics_status_fields() ++ fields(mongodb_single). + +conn_bridge_examples(Method) -> + [ + #{ + <<"mongodb_rs">> => #{ + summary => <<"MongoDB (Replica Set) Bridge">>, + value => values(mongodb_rs, Method) + } + }, + #{ + <<"mongodb_sharded">> => #{ + summary => <<"MongoDB (Sharded) Bridge">>, + value => values(mongodb_sharded, Method) + } + }, + #{ + <<"mongodb_single">> => #{ + summary => <<"MongoDB (Standalone) Bridge">>, + value => values(mongodb_single, Method) + } + } + ]. + +desc("config") -> + ?DESC("desc_config"); +desc(mongodb_rs) -> + ?DESC(mongodb_rs_conf); +desc(mongodb_sharded) -> + ?DESC(mongodb_sharded_conf); +desc(mongodb_single) -> + ?DESC(mongodb_single_conf); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for MongoDB using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +%%================================================================================================= +%% Internal fns +%%================================================================================================= + +values(mongodb_rs = MongoType, Method) -> + TypeOpts = #{ + servers => <<"localhost:27017, localhost:27018">>, + w_mode => <<"safe">>, + r_mode => <<"safe">>, + replica_set_name => <<"rs">> + }, + values(common, MongoType, Method, TypeOpts); +values(mongodb_sharded = MongoType, Method) -> + TypeOpts = #{ + servers => <<"localhost:27017, localhost:27018">>, + w_mode => <<"safe">> + }, + values(common, MongoType, Method, TypeOpts); +values(mongodb_single = MongoType, Method) -> + TypeOpts = #{ + server => <<"localhost:27017">>, + w_mode => <<"safe">> + }, + values(common, MongoType, Method, TypeOpts). + +values(common, MongoType, Method, TypeOpts) -> + MongoTypeBin = atom_to_binary(MongoType), + Common = #{ + name => <>, + type => MongoTypeBin, + enable => true, + collection => <<"mycol">>, + database => <<"mqtt">>, + srv_record => false, + pool_size => 8, + username => <<"myuser">>, + password => <<"mypass">> + }, + MethodVals = method_values(MongoType, Method), + Vals0 = maps:merge(MethodVals, Common), + maps:merge(Vals0, TypeOpts). + +method_values(MongoType, get) -> + Vals = method_values(MongoType, post), + maps:merge(?METRICS_EXAMPLE, Vals); +method_values(MongoType, _) -> + ConnectorType = + case MongoType of + mongodb_rs -> <<"rs">>; + mongodb_sharded -> <<"sharded">>; + mongodb_single -> <<"single">> + end, + #{mongo_type => ConnectorType}. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl new file mode 100644 index 000000000..775acf7b0 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl @@ -0,0 +1,251 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_bridge_mongodb_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, rs}, + {group, sharded}, + {group, single} + | (emqx_common_test_helpers:all(?MODULE) -- group_tests()) + ]. + +group_tests() -> + [ + t_setup_via_config_and_publish, + t_setup_via_http_api_and_publish + ]. + +groups() -> + [ + {rs, group_tests()}, + {sharded, group_tests()}, + {single, group_tests()} + ]. + +init_per_group(Type = rs, Config) -> + MongoHost = os:getenv("MONGO_RS_HOST", "mongo1"), + MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")), + case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of + true -> + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + emqx_mgmt_api_test_util:init_suite(), + MongoConfig = mongo_config(MongoHost, MongoPort, Type), + [ + {mongo_host, MongoHost}, + {mongo_port, MongoPort}, + {mongo_config, MongoConfig} + | Config + ]; + false -> + {skip, no_mongo} + end; +init_per_group(Type = sharded, Config) -> + MongoHost = os:getenv("MONGO_SHARDED_HOST", "mongosharded3"), + MongoPort = list_to_integer(os:getenv("MONGO_SHARDED_PORT", "27017")), + case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of + true -> + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + emqx_mgmt_api_test_util:init_suite(), + MongoConfig = mongo_config(MongoHost, MongoPort, Type), + [ + {mongo_host, MongoHost}, + {mongo_port, MongoPort}, + {mongo_config, MongoConfig} + | Config + ]; + false -> + {skip, no_mongo} + end; +init_per_group(Type = single, Config) -> + MongoHost = os:getenv("MONGO_SINGLE_HOST", "mongo"), + MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")), + case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of + true -> + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + emqx_mgmt_api_test_util:init_suite(), + MongoConfig = mongo_config(MongoHost, MongoPort, Type), + [ + {mongo_host, MongoHost}, + {mongo_port, MongoPort}, + {mongo_config, MongoConfig} + | Config + ]; + false -> + {skip, no_mongo} + end. + +end_per_group(_Type, _Config) -> + ok. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), + ok. + +init_per_testcase(_Testcase, Config) -> + catch clear_db(Config), + delete_bridge(Config), + Config. + +end_per_testcase(_Testcase, Config) -> + catch clear_db(Config), + delete_bridge(Config), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +mongo_config(MongoHost0, MongoPort0, rs) -> + MongoHost = list_to_binary(MongoHost0), + MongoPort = integer_to_binary(MongoPort0), + Servers = <>, + Name = atom_to_binary(?MODULE), + #{ + <<"type">> => <<"mongodb_rs">>, + <<"name">> => Name, + <<"enable">> => true, + <<"collection">> => <<"mycol">>, + <<"servers">> => Servers, + <<"database">> => <<"mqtt">>, + <<"w_mode">> => <<"safe">>, + <<"replica_set_name">> => <<"rs0">> + }; +mongo_config(MongoHost0, MongoPort0, sharded) -> + MongoHost = list_to_binary(MongoHost0), + MongoPort = integer_to_binary(MongoPort0), + Servers = <>, + Name = atom_to_binary(?MODULE), + #{ + <<"type">> => <<"mongodb_sharded">>, + <<"name">> => Name, + <<"enable">> => true, + <<"collection">> => <<"mycol">>, + <<"servers">> => Servers, + <<"database">> => <<"mqtt">>, + <<"w_mode">> => <<"safe">> + }; +mongo_config(MongoHost0, MongoPort0, single) -> + MongoHost = list_to_binary(MongoHost0), + MongoPort = integer_to_binary(MongoPort0), + Server = <>, + Name = atom_to_binary(?MODULE), + #{ + <<"type">> => <<"mongodb_single">>, + <<"name">> => Name, + <<"enable">> => true, + <<"collection">> => <<"mycol">>, + <<"server">> => Server, + <<"database">> => <<"mqtt">>, + <<"w_mode">> => <<"safe">> + }. + +create_bridge(Config0 = #{<<"type">> := Type, <<"name">> := Name}) -> + Config = maps:without( + [ + <<"type">>, + <<"name">> + ], + Config0 + ), + emqx_bridge:create(Type, Name, Config). + +delete_bridge(Config) -> + #{ + <<"type">> := Type, + <<"name">> := Name + } = ?config(mongo_config, Config), + emqx_bridge:remove(Type, Name). + +create_bridge_http(Params) -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + +clear_db(Config) -> + #{ + <<"name">> := Name, + <<"type">> := Type, + <<"collection">> := Collection + } = ?config(mongo_config, Config), + ResourceID = emqx_bridge_resource:resource_id(Type, Name), + {ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID), + Selector = #{}, + {true, _} = ecpool:pick_and_do( + PoolName, {mongo_api, delete, [Collection, Selector]}, no_handover + ), + ok. + +find_all(Config) -> + #{ + <<"name">> := Name, + <<"type">> := Type, + <<"collection">> := Collection + } = ?config(mongo_config, Config), + ResourceID = emqx_bridge_resource:resource_id(Type, Name), + emqx_resource:query(ResourceID, {find, Collection, #{}, #{}}). + +send_message(Config, Payload) -> + #{ + <<"name">> := Name, + <<"type">> := Type + } = ?config(mongo_config, Config), + BridgeID = emqx_bridge_resource:bridge_id(Type, Name), + emqx_bridge:send_message(BridgeID, Payload). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_setup_via_config_and_publish(Config) -> + MongoConfig = ?config(mongo_config, Config), + ?assertMatch( + {ok, _}, + create_bridge(MongoConfig) + ), + Val = erlang:unique_integer(), + ok = send_message(Config, #{key => Val}), + ?assertMatch( + {ok, [#{<<"key">> := Val}]}, + find_all(Config) + ), + ok. + +t_setup_via_http_api_and_publish(Config) -> + MongoConfig = ?config(mongo_config, Config), + ?assertMatch( + {ok, _}, + create_bridge_http(MongoConfig) + ), + Val = erlang:unique_integer(), + ok = send_message(Config, #{key => Val}), + ?assertMatch( + {ok, [#{<<"key">> := Val}]}, + find_all(Config) + ), + ok. diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index 5963b7ab0..1419c2070 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -1,7 +1,8 @@ {erl_opts, [debug_info]}. {deps, [ {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, - {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.4"}}} + {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.4"}}}, + {emqx, {path, "../../apps/emqx"}} ]}. {shell, [ diff --git a/scripts/docker-ct-apps b/scripts/docker-ct-apps index 882a68285..e00f8dd1f 100644 --- a/scripts/docker-ct-apps +++ b/scripts/docker-ct-apps @@ -2,3 +2,4 @@ apps/emqx_authn apps/emqx_authz apps/emqx_connector +lib-ee/emqx_ee_bridge