From 53df218e6a57d8faea9c5d37dbebc99af9f8b02e Mon Sep 17 00:00:00 2001 From: Rory Z Date: Tue, 6 Jul 2021 14:53:28 +0800 Subject: [PATCH] feat(connector): mongo support replica set --- .../docker-compose-mongo-replicaset-tcp.yaml | 81 ++++++++++ .../docker-compose-mongo-replicaset-tls.yaml | 98 ++++++++++++ ...l => docker-compose-mongo-single-tcp.yaml} | 0 ...l => docker-compose-mongo-single-tls.yaml} | 0 apps/emqx_authz/src/emqx_authz.erl | 1 + .../src/emqx_connector_mongo.erl | 143 +++++++++++------- .../src/emqx_connector_redis.erl | 6 +- 7 files changed, 275 insertions(+), 54 deletions(-) create mode 100644 .ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml create mode 100644 .ci/docker-compose-file/docker-compose-mongo-replicaset-tls.yaml rename .ci/docker-compose-file/{docker-compose-mongo-tcp.yaml => docker-compose-mongo-single-tcp.yaml} (100%) rename .ci/docker-compose-file/{docker-compose-mongo-tls.yaml => docker-compose-mongo-single-tls.yaml} (100%) diff --git a/.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml b/.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml new file mode 100644 index 000000000..f83fe0932 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml @@ -0,0 +1,81 @@ +version: "3" + +services: + mongo1: + hostname: mongo1 + container_name: mongo1 + image: mongo:${MONGO_TAG} + environment: + MONGO_INITDB_DATABASE: mqtt + networks: + - emqx_bridge + expose: + - 27017 + ports: + - 27011:27017 + restart: always + command: + --ipv6 + --bind_ip_all + --replSet rs0 + + mongo2: + hostname: mongo2 + container_name: mongo2 + image: mongo:${MONGO_TAG} + environment: + MONGO_INITDB_DATABASE: mqtt + networks: + - emqx_bridge + expose: + - 27017 + ports: + - 27012:27017 + restart: always + command: + --ipv6 + --bind_ip_all + --replSet rs0 + + mongo3: + hostname: mongo3 + container_name: mongo3 + image: mongo:${MONGO_TAG} + environment: + MONGO_INITDB_DATABASE: mqtt + networks: + - emqx_bridge + expose: + - 27017 + ports: + - 27013:27017 + restart: always + command: + --ipv6 + --bind_ip_all + --replSet rs0 + + mongo_client: + image: mongo:${MONGO_TAG} + container_name: mongo_client + networks: + - emqx_bridge + depends_on: + - mongo1 + - mongo2 + - mongo3 + command: + - /bin/bash + - -c + - | + while ! mongo --host mongo1 --eval 'db.runCommand("ping").ok' --quiet > /dev/null 2>&1; do + sleep 1 + done + while ! mongo --host mongo2 --eval 'db.runCommand("ping").ok' --quiet > /dev/null 2>&1; do + sleep 1 + done + while ! mongo --host mongo3 --eval 'db.runCommand("ping").ok' --quiet > /dev/null 2>&1; do + sleep 1 + done + mongo --host mongo1 --eval "rs.initiate( { _id : 'rs0', members: [ { _id : 0, host : 'mongo1:27017' }, { _id : 1, host : 'mongo2:27017' }, { _id : 2, host : 'mongo3:27017' } ] })" --quiet + mongo --host mongo1 --eval "rs.status()" --quiet diff --git a/.ci/docker-compose-file/docker-compose-mongo-replicaset-tls.yaml b/.ci/docker-compose-file/docker-compose-mongo-replicaset-tls.yaml new file mode 100644 index 000000000..be8f7ea21 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-mongo-replicaset-tls.yaml @@ -0,0 +1,98 @@ +version: "3" + +services: + mongo1: + hostname: mongo1 + container_name: mongo1 + image: mongo:${MONGO_TAG} + environment: + MONGO_INITDB_DATABASE: mqtt + networks: + - emqx_bridge + expose: + - 27017 + ports: + - 27011:27017 + restart: always + volumes: + - ../../apps/emqx/etc/certs/cert.pem:/etc/certs/cert.pem + - ../../apps/emqx/etc/certs/key.pem:/etc/certs/key.pem + command: + - /bin/bash + - -c + - | + cat /etc/certs/key.pem /etc/certs/cert.pem > /etc/certs/mongodb.pem + mongod --ipv6 --bind_ip_all --tlsMode requireTLS --tlsCertificateKeyFile /etc/certs/mongodb.pem --replSet rs0 + + mongo2: + hostname: mongo2 + container_name: mongo2 + image: mongo:${MONGO_TAG} + environment: + MONGO_INITDB_DATABASE: mqtt + networks: + - emqx_bridge + expose: + - 27017 + ports: + - 27012:27017 + restart: always + volumes: + - ../../apps/emqx/etc/certs/cert.pem:/etc/certs/cert.pem + - ../../apps/emqx/etc/certs/key.pem:/etc/certs/key.pem + command: + - /bin/bash + - -c + - | + cat /etc/certs/key.pem /etc/certs/cert.pem > /etc/certs/mongodb.pem + mongod --ipv6 --bind_ip_all --tlsMode requireTLS --tlsCertificateKeyFile /etc/certs/mongodb.pem --replSet rs0 + + mongo3: + hostname: mongo3 + container_name: mongo3 + image: mongo:${MONGO_TAG} + environment: + MONGO_INITDB_DATABASE: mqtt + networks: + - emqx_bridge + expose: + - 27017 + ports: + - 27013:27017 + restart: always + volumes: + - ../../apps/emqx/etc/certs/cert.pem:/etc/certs/cert.pem + - ../../apps/emqx/etc/certs/key.pem:/etc/certs/key.pem + command: + - /bin/bash + - -c + - | + cat /etc/certs/key.pem /etc/certs/cert.pem > /etc/certs/mongodb.pem + mongod --ipv6 --bind_ip_all --tlsMode requireTLS --tlsCertificateKeyFile /etc/certs/mongodb.pem --replSet rs0 + + mongo_client: + image: mongo:${MONGO_TAG} + container_name: mongo_client + networks: + - emqx_bridge + depends_on: + - mongo1 + - mongo2 + - mongo3 + volumes: + - ../../apps/emqx/etc/certs/cacert.pem:/etc/certs/cacert.pem + command: + - /bin/bash + - -c + - | + while ! mongo --host mongo1 --tls --tlsCAFile /etc/certs/cacert.pem --tlsAllowInvalidHostnames --eval 'db.runCommand("ping").ok' --quiet > /dev/null 2>&1; do + sleep 1 + done + while ! mongo --host mongo2 --tls --tlsCAFile /etc/certs/cacert.pem --tlsAllowInvalidHostnames --eval 'db.runCommand("ping").ok' --quiet > /dev/null 2>&1; do + sleep 1 + done + while ! mongo --host mongo3 --tls --tlsCAFile /etc/certs/cacert.pem --tlsAllowInvalidHostnames --eval 'db.runCommand("ping").ok' --quiet > /dev/null 2>&1; do + sleep 1 + done + mongo --host mongo1 --tls --tlsCAFile /etc/certs/cacert.pem --tlsAllowInvalidHostnames --eval "rs.initiate( { _id : 'rs0', members: [ { _id : 0, host : 'mongo1:27017' }, { _id : 1, host : 'mongo2:27017' }, { _id : 2, host : 'mongo3:27017' } ] })" --quiet + mongo --host mongo1 --tls --tlsCAFile /etc/certs/cacert.pem --tlsAllowInvalidHostnames --eval "rs.status()" --quiet diff --git a/.ci/docker-compose-file/docker-compose-mongo-tcp.yaml b/.ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml similarity index 100% rename from .ci/docker-compose-file/docker-compose-mongo-tcp.yaml rename to .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml diff --git a/.ci/docker-compose-file/docker-compose-mongo-tls.yaml b/.ci/docker-compose-file/docker-compose-mongo-single-tls.yaml similarity index 100% rename from .ci/docker-compose-file/docker-compose-mongo-tls.yaml rename to .ci/docker-compose-file/docker-compose-mongo-single-tls.yaml diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 578f6d7cc..725d884e1 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -66,6 +66,7 @@ create_resource(#{type := DB, ResourceID = iolist_to_binary([io_lib:format("~s_~s",[?APP, DB]), "_", integer_to_list(erlang:system_time())]), NConfig = case DB of redis -> #{config => Config }; + mongo -> #{config => Config }; _ -> Config end, case emqx_resource:check_and_create( diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 25d9f36df..bac97a41a 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -19,6 +19,9 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). +-type server() :: string(). +-reflect_type([server/0]). + %% callbacks of behaviour emqx_resource -export([ on_start/2 , on_stop/2 @@ -36,16 +39,28 @@ structs() -> [""]. fields("") -> - [ {mongo_type, fun mongo_type/1} + [ {config, #{type => hoconsc:union( + [ hoconsc:ref(?MODULE, single) + , hoconsc:ref(?MODULE, rs) + , hoconsc:ref(?MODULE, sharded) + ])}} + ]; +fields(single) -> + [ {mongo_type, #{type => single, + default => single}} + , {server, fun server/1} + ] ++ mongo_fields(); +fields(rs) -> + [ {mongo_type, #{type => rs, + default => rs}} , {servers, fun servers/1} - , {pool_size, fun emqx_connector_schema_lib:pool_size/1} - , {login, fun emqx_connector_schema_lib:username/1} - , {password, fun emqx_connector_schema_lib:password/1} - , {auth_source, fun auth_source/1} - , {database, fun emqx_connector_schema_lib:database/1} - ] ++ - % mongodb_rs_set_name_fields() ++ - emqx_connector_schema_lib:ssl_fields(); + , {replicaset_name, fun emqx_connector_schema_lib:database/1} + ] ++ mongo_fields(); +fields(sharded) -> + [ {mongo_type, #{type => sharded, + default => sharded}} + , {servers, fun servers/1} + ] ++ mongo_fields(); fields(topology) -> [ {max_overflow, fun emqx_connector_schema_lib:pool_size/1} , {overflow_ttl, fun duration/1} @@ -59,40 +74,42 @@ fields(topology) -> , {min_heartbeat_frequency_ms, fun duration/1} ]. +mongo_fields() -> + [ {pool_size, fun emqx_connector_schema_lib:pool_size/1} + , {login, fun emqx_connector_schema_lib:username/1} + , {password, fun emqx_connector_schema_lib:password/1} + , {auth_source, fun auth_source/1} + , {database, fun emqx_connector_schema_lib:database/1} + ] ++ + emqx_connector_schema_lib:ssl_fields(). + on_jsonify(Config) -> Config. %% =================================================================== -on_start(InstId, #{servers := Servers, - mongo_type := Type, - database := Database, - pool_size := PoolSize, - ssl := SSL} = Config) -> +on_start(InstId, #{config := #{server := Server, + mongo_type := single} = Config}) -> logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), - SslOpts = case maps:get(enable, SSL) of - true -> - [{ssl, true}, - {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)} - ]; - false -> [{ssl, false}] - end, - Hosts = [string:trim(H) || H <- string:tokens(binary_to_list(Servers), ",")], - Opts = [{type, init_type(Type, Config)}, - {hosts, Hosts}, - {pool_size, PoolSize}, - {options, init_topology_options(maps:to_list(Config), [])}, - {worker_options, init_worker_options(maps:to_list(Config), SslOpts)}], + Opts = [{type, single}, + {hosts, [Server]} + ], + do_start(InstId, Opts, Config); - %% test the connection - TestOpts = [{database, Database}] ++ host_port(hd(Hosts)), - {ok, TestConn} = mc_worker_api:connect(TestOpts), +on_start(InstId, #{config := #{servers := Servers, + mongo_type := rs, + replicaset_name := RsName} = Config}) -> + logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), + Opts = [{type, {rs, RsName}}, + {hosts, Servers}], + do_start(InstId, Opts, Config); - PoolName = emqx_plugin_libs_pool:pool_name(InstId), - _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts), - {ok, #{poolname => PoolName, - type => Type, - test_conn => TestConn, - test_opts => TestOpts}}. +on_start(InstId, #{config := #{servers := Servers, + mongo_type := sharded} = Config}) -> + logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), + Opts = [{type, sharded}, + {hosts, Servers} + ], + do_start(InstId, Opts, Config). on_stop(InstId, #{poolname := PoolName}) -> logger:info("stopping mongodb connector: ~p", [InstId]), @@ -138,10 +155,38 @@ mongo_query(Conn, find, Collection, Selector, Docs) -> mongo_query(_Conn, _Action, _Collection, _Selector, _Docs) -> ok. -init_type(rs, #{rs_set_name := Name}) -> - {rs, Name}; -init_type(Type, _Opts) -> - Type. +do_start(InstId, Opts0, Config = #{mongo_type := Type, + database := Database, + pool_size := PoolSize, + ssl := SSL}) -> + SslOpts = case maps:get(enable, SSL) of + true -> + [{ssl, true}, + {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)} + ]; + false -> [{ssl, false}] + end, + Opts = Opts0 ++ + [{pool_size, PoolSize}, + {options, init_topology_options(maps:to_list(Config), [])}, + {worker_options, init_worker_options(maps:to_list(Config), SslOpts)}], + %% test the connection + TestOpts = case maps:is_key(server, Config) of + true -> + Server = maps:get(server, Config), + host_port(Server); + false -> + Servers = maps:get(servers, Config), + host_port(erlang:hd(Servers)) + end ++ [{database, Database}], + {ok, TestConn} = mc_worker_api:connect(TestOpts), + + PoolName = emqx_plugin_libs_pool:pool_name(InstId), + _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts), + {ok, #{poolname => PoolName, + type => Type, + test_conn => TestConn, + test_opts => TestOpts}}. init_topology_options([{pool_size, Val}| R], Acc) -> init_topology_options(R, [{pool_size, Val}| Acc]); @@ -196,22 +241,18 @@ host_port(HostPort) -> [{host, Host1}] end. -% mongodb_rs_set_name_fields() -> -% [ {rs_set_name, fun emqx_connector_schema_lib:database/1} -% ]. +server(type) -> server(); +server(validator) -> [?REQUIRED("the field 'server' is required")]; +server(_) -> undefined. + +servers(type) -> hoconsc:array(server()); +servers(validator) -> [?REQUIRED("the field 'servers' is required")]; +servers(_) -> undefined. auth_source(type) -> binary(); auth_source(nullable) -> true; auth_source(_) -> undefined. -servers(type) -> binary(); -servers(validator) -> [?REQUIRED("the field 'servers' is required")]; -servers(_) -> undefined. - -mongo_type(type) -> hoconsc:enum([single, unknown, shared, rs]); -mongo_type(default) -> single; -mongo_type(_) -> undefined. - duration(type) -> emqx_schema:duration_ms(); duration(nullable) -> true; duration(_) -> undefined. diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 6333cdb1a..0df12185d 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -45,9 +45,9 @@ structs() -> [""]. fields("") -> [ {config, #{type => hoconsc:union( - [ hoconsc:ref(cluster) - , hoconsc:ref(single) - , hoconsc:ref(sentinel) + [ hoconsc:ref(?MODULE, cluster) + , hoconsc:ref(?MODULE, single) + , hoconsc:ref(?MODULE, sentinel) ])} } ];