From 28735dc6d7f5c9b4d6504465052d8dfa26693a53 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 8 Feb 2022 10:54:46 +0800 Subject: [PATCH] refactor(connector): parse servers for `rs` and `sharded` mongo_type --- .../src/emqx_connector_mongo.erl | 115 +++++++++++------- 1 file changed, 71 insertions(+), 44 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 3a76bba5e..05f30c2ee 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -298,6 +298,8 @@ server(_) -> undefined. servers(type) -> binary(); servers(nullable) -> false; servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")]; +servers(converter) -> fun to_servers_raw/1; +servers(desc) -> ?SERVERS_DESC ++ server(desc); servers(_) -> undefined. w_mode(type) -> hoconsc:enum([unsafe, safe]); @@ -323,19 +325,9 @@ srv_record(_) -> undefined. %% =================================================================== %% Internal funcs -parse_servers(Type, Servers) when is_binary(Servers) -> - parse_servers(Type, binary_to_list(Servers)); -parse_servers(Type, Servers) when is_list(Servers) -> - case string:split(Servers, ",", all) of - [Host | _] when Type =:= single -> - [Host]; - Hosts -> - Hosts - end. - may_parse_srv_and_txt_records(#{server := Server} = Config) -> NConfig = maps:remove(server, Config), - may_parse_srv_and_txt_records_(NConfig#{servers => Server}); + may_parse_srv_and_txt_records_(NConfig#{servers => [Server]}); may_parse_srv_and_txt_records(Config) -> may_parse_srv_and_txt_records_(Config). @@ -346,47 +338,52 @@ may_parse_srv_and_txt_records_(#{mongo_type := Type, true -> error({missing_parameter, replica_set_name}); false -> - Config#{hosts => parse_servers(Type, Servers)} + Config#{hosts => servers_to_bin(Servers)} end; may_parse_srv_and_txt_records_(#{mongo_type := Type, srv_record := true, servers := Servers} = Config) -> - NServers = binary_to_list(Servers), - Hosts = parse_srv_records(Type, NServers), - ExtraOpts = parse_txt_records(Type, NServers), + Hosts = parse_srv_records(Type, Servers), + ExtraOpts = parse_txt_records(Type, Servers), maps:merge(Config#{hosts => Hosts}, ExtraOpts). -parse_srv_records(Type, Server) -> - case inet_res:lookup("_mongodb._tcp." ++ Server, in, srv) of - [] -> - error(service_not_found); - Services -> - case [Host ++ ":" ++ integer_to_list(Port) || {_, _, Port, Host} <- Services] of - [H | _] when Type =:= single -> - [H]; - Hosts -> - Hosts - end +parse_srv_records(Type, Servers) -> + Fun = fun(AccIn, {IpOrHost, _Port}) -> + case inet_res:lookup("_mongodb._tcp." ++ ip_or_host_to_string(IpOrHost), in, srv) of + [] -> + error(service_not_found); + Services -> + [ [server_to_bin({Host, Port}) || {_, _, Port, Host} <- Services] + | AccIn] + end + end, + Res = lists:foldl(Fun, [], Servers), + case Type of + single -> lists:nth(1, Res); + _ -> Res end. -parse_txt_records(Type, Server) -> - case inet_res:lookup(Server, in, txt) of - [] -> - #{}; - [[QueryString]] -> - case uri_string:dissect_query(QueryString) of - {error, _, _} -> - error({invalid_txt_record, invalid_query_string}); - Options -> - Fields = case Type of - rs -> ["authSource", "replicaSet"]; - _ -> ["authSource"] - end, - take_and_convert(Fields, Options) - end; - _ -> - error({invalid_txt_record, multiple_records}) - end. +parse_txt_records(Type, Servers) -> + Fun = fun(AccIn, {IpOrHost, _Port}) -> + case inet_res:lookup(IpOrHost, in, txt) of + [] -> + #{}; + [[QueryString]] -> + case uri_string:dissect_query(QueryString) of + {error, _, _} -> + error({invalid_txt_record, invalid_query_string}); + Options -> + Fields = case Type of + rs -> ["authSource", "replicaSet"]; + _ -> ["authSource"] + end, + maps:merge(AccIn, take_and_convert(Fields, Options)) + end; + _ -> + error({invalid_txt_record, multiple_records}) + end + end, + lists:foldl(Fun, #{}, Servers). take_and_convert(Fields, Options) -> take_and_convert(Fields, Options, #{}). @@ -407,6 +404,21 @@ take_and_convert([Field | More], Options, Acc) -> take_and_convert(More, Options, Acc) end. +-spec ip_or_host_to_string(binary() | string() | tuple()) + -> string(). +ip_or_host_to_string(Ip) when is_tuple(Ip) -> + inet:ntoa(Ip); +ip_or_host_to_string(Host) -> + str(Host). + +servers_to_bin([Server | Rest]) -> + [server_to_bin(Server) | servers_to_bin(Rest)]; +servers_to_bin([]) -> + []. + +server_to_bin({IpOrHost, Port}) -> + iolist_to_binary(ip_or_host_to_string(IpOrHost) ++ ":" ++ integer_to_list(Port)). + %% =================================================================== %% typereflt funcs @@ -414,3 +426,18 @@ take_and_convert([Field | More], Options, Acc) -> -> {string(), pos_integer()}. to_server_raw(Server) -> emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS). + +-spec to_servers_raw(string()) + -> [{string(), pos_integer()}]. +to_servers_raw(Servers) -> + lists:map( fun(Server) -> + emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS) + end + , string:tokens(str(Servers), ", ")). + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S.