diff --git a/apps/emqx_connector/rebar.config b/apps/emqx_connector/rebar.config index 85ee7c488..4773d0859 100644 --- a/apps/emqx_connector/rebar.config +++ b/apps/emqx_connector/rebar.config @@ -8,7 +8,7 @@ {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}}, {epgsql, {git, "https://github.com/epgsql/epgsql", {tag, "4.4.0"}}}, %% NOTE: mind poolboy version when updating mongodb-erlang version - {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.9"}}}, + {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.10"}}}, %% NOTE: mind poolboy version when updating eredis_cluster version {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.7"}}}, %% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 59c80e959..14885868b 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -20,10 +20,6 @@ -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). -include_lib("emqx/include/logger.hrl"). --type server() :: emqx_schema:ip_port(). --reflect_type([server/0]). --typerefl_from_string({server/0, emqx_connector_schema_lib, to_ip_port}). - %% callbacks of behaviour emqx_resource -export([ on_start/2 , on_stop/2 @@ -37,6 +33,7 @@ -export([roots/0, fields/1]). -export([mongo_query/5]). + %%===================================================================== roots() -> [ {config, #{type => hoconsc:union( @@ -55,7 +52,7 @@ fields(rs) -> [ {mongo_type, #{type => rs, default => rs}} , {servers, fun servers/1} - , {replica_set_name, fun emqx_connector_schema_lib:database/1} + , {replica_set_name, fun replica_set_name/1} ] ++ mongo_fields(); fields(sharded) -> [ {mongo_type, #{type => sharded, @@ -77,7 +74,8 @@ fields(topology) -> ]. mongo_fields() -> - [ {pool_size, fun emqx_connector_schema_lib:pool_size/1} + [ {srv_record, fun srv_record/1} + , {pool_size, fun emqx_connector_schema_lib:pool_size/1} , {username, fun emqx_connector_schema_lib:username/1} , {password, fun emqx_connector_schema_lib:password/1} , {auth_source, #{type => binary(), @@ -92,35 +90,32 @@ on_jsonify(Config) -> Config. %% =================================================================== -on_start(InstId, Config = #{server := Server, - mongo_type := single}) -> - ?SLOG(info, #{msg => "starting mongodb single connector", - connector => InstId, config => Config}), - Opts = [{type, single}, - {hosts, [emqx_connector_schema_lib:ip_port_to_string(Server)]} - ], - do_start(InstId, Opts, Config); -on_start(InstId, Config = #{servers := Servers, - mongo_type := rs, - replica_set_name := RsName}) -> - ?SLOG(info, #{msg => "starting mongodb rs connector", - connector => InstId, config => Config}), - Opts = [{type, {rs, RsName}}, - {hosts, [emqx_connector_schema_lib:ip_port_to_string(S) - || S <- Servers]} - ], - do_start(InstId, Opts, Config); - -on_start(InstId, Config = #{servers := Servers, - mongo_type := sharded}) -> - ?SLOG(info, #{msg => "starting mongodb sharded connector", - connector => InstId, config => Config}), - Opts = [{type, sharded}, - {hosts, [emqx_connector_schema_lib:ip_port_to_string(S) - || S <- Servers]} - ], - do_start(InstId, Opts, Config). +on_start(InstId, Config = #{mongo_type := Type, + pool_size := PoolSize, + ssl := SSL}) -> + Msg = case Type of + single -> "starting_mongodb_single_connector"; + rs -> "starting_mongodb_replica_set_connector"; + sharded -> "starting_mongodb_sharded_connector" + end, + ?SLOG(info, #{msg => Msg, connector => InstId, config => Config}), + NConfig = may_parse_srv_and_txt_records(Config), + SslOpts = case maps:get(enable, SSL) of + true -> + [{ssl, true}, + {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)} + ]; + false -> [{ssl, false}] + end, + Topology = maps:get(topology, NConfig, #{}), + Opts = [{type, init_type(NConfig)}, + {pool_size, PoolSize}, + {options, init_topology_options(maps:to_list(Topology), [])}, + {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}], + PoolName = emqx_plugin_libs_pool:pool_name(InstId), + _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts), + {ok, #{poolname => PoolName, type => Type}}. on_stop(InstId, #{poolname := PoolName}) -> ?SLOG(info, #{msg => "stopping mongodb connector", @@ -184,39 +179,10 @@ mongo_query(Conn, find_one, Collection, Selector, Projector) -> mongo_query(_Conn, _Action, _Collection, _Selector, _Projector) -> ok. -do_start(InstId, Opts0, Config = #{mongo_type := Type, - database := Database, - pool_size := PoolSize, - ssl := SSL}) -> - SslOpts = case maps:get(enable, SSL) of - true -> - [{ssl, true}, - {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)} - ]; - false -> [{ssl, false}] - end, - Topology= maps:get(topology, Config, #{}), - Opts = Opts0 ++ - [{pool_size, PoolSize}, - {options, init_topology_options(maps:to_list(Topology), [])}, - {worker_options, init_worker_options(maps:to_list(Config), SslOpts)}], - %% test the connection - TestOpts = case maps:is_key(server, Config) of - true -> - Server = maps:get(server, Config), - host_port(Server); - false -> - Servers = maps:get(servers, Config), - host_port(erlang:hd(Servers)) - end ++ [{database, Database}], - {ok, TestConn} = mc_worker_api:connect(TestOpts), - - PoolName = emqx_plugin_libs_pool:pool_name(InstId), - _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts), - {ok, #{poolname => PoolName, - type => Type, - test_conn => TestConn, - test_opts => TestOpts}}. +init_type(#{type := rs, replica_set_name := ReplicaSetName}) -> + {rs, ReplicaSetName}; +init_type(#{type := Type}) -> + Type. init_topology_options([{pool_size, Val}| R], Acc) -> init_topology_options(R, [{pool_size, Val}| Acc]); @@ -261,17 +227,106 @@ init_worker_options([_ | R], Acc) -> init_worker_options(R, Acc); init_worker_options([], Acc) -> Acc. -host_port({Host, Port}) -> - [{host, Host}, {port, Port}]. - -server(type) -> server(); +server(type) -> binary(); server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")]; server(_) -> undefined. -servers(type) -> hoconsc:array(server()); +servers(type) -> binary(); servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")]; servers(_) -> undefined. duration(type) -> emqx_schema:duration_ms(); duration(nullable) -> true; duration(_) -> undefined. + +replica_set_name(type) -> binary(); +replica_set_name(nullable) -> true; +replica_set_name(_) -> undefined. + +srv_record(type) -> boolean(); +srv_record(default) -> false; +srv_record(_) -> undefined. + +parse_servers(Type, Servers) when is_binary(Servers) -> + parse_servers(Type, binary_to_list(Servers)); +parse_servers(Type, Servers) when is_list(Servers) -> + case string:split(Servers, ",", trailing) of + [Host | _] when Type =:= single -> + [Host]; + Hosts -> + Hosts + end. + +may_parse_srv_and_txt_records(#{server := Server} = Config) -> + NConfig = maps:remove(server, Config), + may_parse_srv_and_txt_records_(NConfig#{servers => Server}); +may_parse_srv_and_txt_records(Config) -> + may_parse_srv_and_txt_records_(Config). + +may_parse_srv_and_txt_records_(#{mongo_type := Type, + srv_record := false, + servers := Servers} = Config) -> + case Type =:= rs andalso maps:is_key(replica_set_name, Config) =:= false of + true -> + error({missing_parameter, replica_set_name}); + false -> + Config#{hosts => parse_servers(Type, Servers)} + end; +may_parse_srv_and_txt_records_(#{mongo_type := Type, + srv_record := true, + servers := Servers} = Config) -> + NServers = binary_to_list(Servers), + Hosts = parse_srv_records(Type, NServers), + ExtraOpts = parse_txt_records(Type, NServers), + maps:merge(Config#{hosts => Hosts}, ExtraOpts). + +parse_srv_records(Type, Server) -> + case inet_res:lookup("_mongodb._tcp." ++ Server, in, srv) of + [] -> + error(service_not_found); + Services -> + case [Host ++ ":" ++ integer_to_list(Port) || {_, _, Port, Host} <- Services] of + [H | _] when Type =:= single -> + [H]; + Hosts -> + Hosts + end + end. + +parse_txt_records(Type, Server) -> + case inet_res:lookup(Server, in, txt) of + [] -> + #{}; + [[QueryString]] -> + case uri_string:dissect_query(QueryString) of + {error, _, _} -> + error({invalid_txt_record, invalid_query_string}); + Options -> + Fields = case Type of + rs -> ["authSource", "replicaSet"]; + _ -> ["authSource"] + end, + take_and_convert(Fields, Options) + end; + _ -> + error({invalid_txt_record, multiple_records}) + end. + +take_and_convert(Fields, Options) -> + take_and_convert(Fields, Options, #{}). + +take_and_convert([], [_ | _], _Acc) -> + error({invalid_txt_record, invalid_option}); +take_and_convert([], [], Acc) -> + Acc; +take_and_convert([Field | More], Options, Acc) -> + case lists:keytake(Field, 1, Options) of + {value, {"authSource", V}, NOptions} -> + take_and_convert(More, NOptions, Acc#{auth_source => list_to_binary(V)}); + {value, {"replicaSet", V}, NOptions} -> + take_and_convert(More, NOptions, Acc#{replica_set_name => list_to_binary(V)}); + {value, _, _} -> + error({invalid_txt_record, invalid_option}); + false -> + take_and_convert(More, Options, Acc) + end.