From 0ce1ca89b7fb1de49e502d766c9a43a4e78d781e Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sat, 24 Dec 2022 15:50:01 +0100 Subject: [PATCH] refactor: use string type for server and servers --- apps/emqx/src/emqx_config.erl | 25 ++ apps/emqx/src/emqx_config_handler.erl | 7 +- apps/emqx/src/emqx_schema.erl | 272 ++++++++++++++-- apps/emqx/test/emqx_schema_tests.erl | 291 +++++++++++++++++- .../test/emqx_authn_mysql_SUITE.erl | 3 +- .../test/emqx_authn_pgsql_SUITE.erl | 36 ++- .../test/emqx_authn_redis_SUITE.erl | 28 +- .../test/emqx_authz_mysql_SUITE.erl | 8 +- .../test/emqx_authz_postgresql_SUITE.erl | 8 +- .../test/emqx_authz_redis_SUITE.erl | 17 +- .../emqx_connector/src/emqx_connector.app.src | 2 +- .../src/emqx_connector_ldap.erl | 45 +-- .../emqx_connector/src/emqx_connector_lib.erl | 22 ++ .../src/emqx_connector_mongo.erl | 250 ++++++--------- .../src/emqx_connector_mysql.erl | 20 +- .../src/emqx_connector_pgsql.erl | 23 +- .../src/emqx_connector_redis.erl | 56 +--- .../src/emqx_connector_schema_lib.erl | 54 ---- .../src/mqtt/emqx_connector_mqtt_mod.erl | 14 +- .../src/mqtt/emqx_connector_mqtt_schema.erl | 17 +- .../test/emqx_connector_mongo_tests.erl | 270 ++++++++-------- .../test/emqx_connector_mqtt_tests.erl | 4 +- .../emqx_dashboard/src/emqx_dashboard.app.src | 2 +- .../src/emqx_dashboard_swagger.erl | 2 - apps/emqx_gateway/src/emqx_gateway.app.src | 2 +- .../src/mqttsn/emqx_sn_broadcast.erl | 6 + apps/emqx_machine/src/emqx_machine.app.src | 2 +- apps/emqx_statsd/include/emqx_statsd.hrl | 17 + apps/emqx_statsd/src/emqx_statsd.app.src | 2 +- apps/emqx_statsd/src/emqx_statsd.erl | 3 +- apps/emqx_statsd/src/emqx_statsd_schema.erl | 14 +- changes/v5.0.14-en.md | 4 + changes/v5.0.14-zh.md | 4 + .../i18n/emqx_ee_bridge_kafka.conf | 4 +- .../src/emqx_ee_bridge_kafka.erl | 18 +- .../kafka/emqx_bridge_impl_kafka_producer.erl | 6 +- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 62 ++-- .../test/emqx_ee_bridge_redis_SUITE.erl | 14 +- .../src/emqx_ee_connector_gcp_pubsub.erl | 4 +- .../src/emqx_ee_connector_influxdb.erl | 56 ++-- scripts/apps-version-check.sh | 3 +- scripts/git-hook-pre-commit.sh | 1 + 42 files changed, 1029 insertions(+), 669 deletions(-) create mode 100644 apps/emqx_connector/src/emqx_connector_lib.erl diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index f5ff64d4d..81940f191 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -413,6 +413,31 @@ check_config(SchemaMod, RawConf) -> check_config(SchemaMod, RawConf, #{}). check_config(SchemaMod, RawConf, Opts0) -> + try + do_check_config(SchemaMod, RawConf, Opts0) + catch + throw:{Schema, Errors} -> + compact_errors(Schema, Errors) + end. + +%% HOCON tries to be very informative about all the detailed errors +%% it's maybe too much when reporting to the user +-spec compact_errors(any(), any()) -> no_return(). +compact_errors(Schema, [Error0 | More]) when is_map(Error0) -> + Error1 = Error0#{discarded_errors_count => length(More)}, + Error = + case is_atom(Schema) of + true -> + Error1#{schema_module => Schema}; + false -> + Error1 + end, + throw(Error); +compact_errors(Schema, Errors) -> + %% unexpected, we need the stacktrace reported, hence error + error({Schema, Errors}). + +do_check_config(SchemaMod, RawConf, Opts0) -> Opts1 = #{ return_plain => true, format => map, diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index 0311418a9..6bbd5f681 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -245,7 +245,7 @@ process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) -> BinKeyPath = bin_path(ConfKeyPath), case check_permissions(update, BinKeyPath, NewRawConf, Opts) of allow -> - OverrideConf = update_override_config(NewRawConf, Opts), + OverrideConf = merge_to_override_config(NewRawConf, Opts), {ok, NewRawConf, OverrideConf, Opts}; {deny, Reason} -> {error, {permission_denied, Reason}} @@ -447,9 +447,10 @@ remove_from_override_config(BinKeyPath, Opts) -> OldConf = emqx_config:read_override_conf(Opts), emqx_map_lib:deep_remove(BinKeyPath, OldConf). -update_override_config(_RawConf, #{persistent := false}) -> +%% apply new config on top of override config +merge_to_override_config(_RawConf, #{persistent := false}) -> undefined; -update_override_config(RawConf, Opts) -> +merge_to_override_config(RawConf, Opts) -> OldConf = emqx_config:read_override_conf(Opts), maps:merge(OldConf, RawConf). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ebe27f2a5..a8e25af8b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -40,8 +40,9 @@ -type comma_separated_atoms() :: [atom()]. -type bar_separated_list() :: list(). -type ip_port() :: tuple() | integer(). --type host_port() :: tuple(). -type cipher() :: map(). +-type port_number() :: 1..65536. +-type server_parse_option() :: #{default_port => port_number(), no_port => boolean()}. -typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). @@ -53,7 +54,6 @@ -typerefl_from_string({comma_separated_binary/0, emqx_schema, to_comma_separated_binary}). -typerefl_from_string({bar_separated_list/0, emqx_schema, to_bar_separated_list}). -typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}). --typerefl_from_string({host_port/0, emqx_schema, to_host_port}). -typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}). -typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}). @@ -80,11 +80,19 @@ to_comma_separated_binary/1, to_bar_separated_list/1, to_ip_port/1, - to_host_port/1, to_erl_cipher_suite/1, to_comma_separated_atoms/1 ]). +-export([ + parse_server/2, + parse_servers/2, + servers_validator/2, + servers_sc/2, + convert_servers/1, + convert_servers/2 +]). + -behaviour(hocon_schema). -reflect_type([ @@ -99,7 +107,6 @@ comma_separated_binary/0, bar_separated_list/0, ip_port/0, - host_port/0, cipher/0, comma_separated_atoms/0 ]). @@ -2172,40 +2179,15 @@ to_bar_separated_list(Str) -> %% - :1883 %% - :::1883 to_ip_port(Str) -> - to_host_port(Str, ip_addr). - -%% @doc support the following format: -%% - 127.0.0.1:1883 -%% - ::1:1883 -%% - [::1]:1883 -%% - :1883 -%% - :::1883 -%% - example.com:80 -to_host_port(Str) -> - to_host_port(Str, hostname). - -%% - example.com:80 -to_host_port(Str, IpOrHost) -> - case split_host_port(Str) of - {"", Port} when IpOrHost =:= ip_addr -> + case split_ip_port(Str) of + {"", Port} -> %% this is a local address {ok, list_to_integer(Port)}; - {"", _Port} -> - %% must specify host part when it's a remote endpoint - {error, bad_host_port}; {MaybeIp, Port} -> PortVal = list_to_integer(Port), case inet:parse_address(MaybeIp) of {ok, IpTuple} -> {ok, {IpTuple, PortVal}}; - _ when IpOrHost =:= hostname -> - %% check is a rfc1035's hostname - case inet_parse:domain(MaybeIp) of - true -> - {ok, {MaybeIp, PortVal}}; - _ -> - {error, bad_hostname} - end; _ -> {error, bad_ip_port} end; @@ -2213,7 +2195,7 @@ to_host_port(Str, IpOrHost) -> {error, bad_ip_port} end. -split_host_port(Str0) -> +split_ip_port(Str0) -> Str = re:replace(Str0, " ", "", [{return, list}, global]), case lists:split(string:rchr(Str, $:), Str) of %% no colon @@ -2376,3 +2358,229 @@ non_empty_string(<<>>) -> {error, empty_string_not_allowed}; non_empty_string("") -> {error, empty_string_not_allowed}; non_empty_string(S) when is_binary(S); is_list(S) -> ok; non_empty_string(_) -> {error, invalid_string}. + +%% @doc Make schema for 'server' or 'servers' field. +%% for each field, there are three passes: +%% 1. converter: Normalize the value. +%% This normalized value is stored in EMQX's raw config. +%% 2. validator: Validate the normalized value. +%% Besides checkin if the value can be empty or undefined +%% it also calls the 3rd pass to see if the provided +%% hosts can be successfully parsed. +%% 3. parsing: Done at runtime in each module which uses this config +servers_sc(Meta0, ParseOpts) -> + Required = maps:get(required, Meta0, true), + Meta = #{ + required => Required, + converter => fun convert_servers/2, + validator => servers_validator(ParseOpts, Required) + }, + sc(string(), maps:merge(Meta, Meta0)). + +%% @hidden Convert a deep map to host:port pairs. +%% This is due to the fact that a host:port string +%% often can be parsed as a HOCON struct. +%% e.g. when a string from environment variable is `host.domain.name:80' +%% without escaped quotes, it's parsed as +%% `#{<<"host">> => #{<<"domain">> => #{<<"name">> => 80}}}' +%% and when it is a comma-separated list of host:port pairs +%% like `h1.foo:80, h2.bar:81' then it is parsed as +%% `#{<<"h1">> => #{<<"foo">> => 80}, <<"h2">> => #{<<"bar">> => 81}}' +%% This function is to format the map back to host:port (pairs) +%% This function also tries to remove spaces around commas in comma-separated, +%% `host:port' list, and format string array to comma-separated. +convert_servers(HoconValue, _HoconOpts) -> + convert_servers(HoconValue). + +convert_servers(undefined) -> + %% should not format 'undefined' as string + %% not to throw exception either + %% (leave it to the 'required => true | false' check) + undefined; +convert_servers(Map) when is_map(Map) -> + try + List = convert_hocon_map_host_port(Map), + iolist_to_binary(string:join(List, ",")) + catch + _:_ -> + throw("bad_host_port") + end; +convert_servers([H | _] = Array) when is_binary(H) orelse is_list(H) -> + %% if the old config was a string array + %% we want to make sure it's converted to a comma-separated + iolist_to_binary([[I, ","] || I <- Array]); +convert_servers(Str) -> + normalize_host_port_str(Str). + +%% remove spaces around comma (,) +normalize_host_port_str(Str) -> + iolist_to_binary(re:replace(Str, "(\s)*,(\s)*", ",")). + +%% @doc Shared validation function for both 'server' and 'servers' string. +%% NOTE: Validator is called after converter. +servers_validator(Opts, Required) -> + fun(Str0) -> + Str = str(Str0), + case Str =:= "" orelse Str =:= "undefined" of + true when Required -> + %% it's a required field + %% but value is set to an empty string (from environment override) + %% or when the filed is not set in config file + %% NOTE: assuming nobody is going to name their server "undefined" + throw("cannot_be_empty"); + true -> + ok; + _ -> + %% it's valid as long as it can be parsed + _ = parse_servers(Str, Opts), + ok + end + end. + +%% @doc Parse `host[:port]' endpoint to a `{Host, Port}' tuple or just `Host' string. +%% `Opt' is a `map()' with below options supported: +%% +%% `default_port': a port number, so users are not forced to configure +%% port number. +%% `no_port': by default it's `false', when set to `true', +%% a `throw' exception is raised if the port is found. +-spec parse_server(undefined | string() | binary(), server_parse_option()) -> + {string(), port_number()}. +parse_server(Str, Opts) -> + case parse_servers(Str, Opts) of + undefined -> + undefined; + [L] -> + L; + [_ | _] = L -> + throw("expecting_one_host_but_got: " ++ integer_to_list(length(L))) + end. + +%% @doc Parse comma separated `host[:port][,host[:port]]' endpoints +%% into a list of `{Host, Port}' tuples or just `Host' string. +-spec parse_servers(undefined | string() | binary(), server_parse_option()) -> + [{string(), port_number()}]. +parse_servers(undefined, _Opts) -> + %% should not parse 'undefined' as string, + %% not to throw exception either, + %% leave it to the 'required => true | false' check + undefined; +parse_servers(Str, Opts) -> + case do_parse_servers(Str, Opts) of + [] -> + %% treat empty as 'undefined' + undefined; + [_ | _] = L -> + L + end. + +do_parse_servers([H | _] = Array, Opts) when is_binary(H) orelse is_list(H) -> + %% the old schema allowed providing a list of strings + %% e.g. ["server1:80", "server2:80"] + lists:map( + fun(HostPort) -> + do_parse_server(str(HostPort), Opts) + end, + Array + ); +do_parse_servers(Str, Opts) when is_binary(Str) orelse is_list(Str) -> + lists:map( + fun(HostPort) -> + do_parse_server(HostPort, Opts) + end, + split_host_port(Str) + ). + +split_host_port(Str) -> + lists:filtermap( + fun(S) -> + case string:strip(S) of + "" -> false; + X -> {true, X} + end + end, + string:tokens(str(Str), ",") + ). + +do_parse_server(Str, Opts) -> + DefaultPort = maps:get(default_port, Opts, undefined), + NotExpectingPort = maps:get(no_port, Opts, false), + case is_integer(DefaultPort) andalso NotExpectingPort of + true -> + %% either provide a default port from schema, + %% or do not allow user to set port number + error("bad_schema"); + false -> + ok + end, + %% do not split with space, there should be no space allowed between host and port + case string:tokens(Str, ":") of + [Hostname, Port] -> + NotExpectingPort andalso throw("not_expecting_port_number"), + {check_hostname(Hostname), parse_port(Port)}; + [Hostname] -> + case is_integer(DefaultPort) of + true -> + {check_hostname(Hostname), DefaultPort}; + false when NotExpectingPort -> + check_hostname(Hostname); + false -> + throw("missing_port_number") + end; + _ -> + throw("bad_host_port") + end. + +check_hostname(Str) -> + %% not intended to use inet_parse:domain here + %% only checking space because it interferes the parsing + case string:tokens(Str, " ") of + [H] -> + case is_port_number(H) of + true -> + throw("expecting_hostname_but_got_a_number"); + false -> + H + end; + _ -> + throw("hostname_has_space") + end. + +convert_hocon_map_host_port(Map) -> + lists:map( + fun({Host, Port}) -> + %% Only when Host:Port string is a valid HOCON object + %% is it possible for the converter to reach here. + %% + %% For example EMQX_FOO__SERVER='1.2.3.4:1234' is parsed as + %% a HOCON string value "1.2.3.4:1234" but not a map because + %% 1 is not a valid HOCON field. + %% + %% EMQX_FOO__SERVER="local.domain.host" (without ':port') + %% is also not a valid HOCON object (because it has no value), + %% hence parsed as string. + true = (Port > 0), + str(Host) ++ ":" ++ integer_to_list(Port) + end, + hocon_maps:flatten(Map, #{}) + ). + +is_port_number(Port) -> + try + _ = parse_port(Port), + true + catch + _:_ -> + false + end. + +parse_port(Port) -> + try + P = list_to_integer(string:strip(Port)), + true = (P > 0), + true = (P =< 65535), + P + catch + _:_ -> + throw("bad_port_number") + end. diff --git a/apps/emqx/test/emqx_schema_tests.erl b/apps/emqx/test/emqx_schema_tests.erl index fba70e303..df298849d 100644 --- a/apps/emqx/test/emqx_schema_tests.erl +++ b/apps/emqx/test/emqx_schema_tests.erl @@ -178,27 +178,290 @@ ssl_opts_gc_after_handshake_test_not_rancher_listener_test() -> to_ip_port_test_() -> Ip = fun emqx_schema:to_ip_port/1, - Host = fun(Str) -> - case Ip(Str) of - {ok, {_, _} = Res} -> - %% assert - {ok, Res} = emqx_schema:to_host_port(Str); - _ -> - emqx_schema:to_host_port(Str) - end - end, [ ?_assertEqual({ok, 80}, Ip("80")), - ?_assertEqual({error, bad_host_port}, Host("80")), ?_assertEqual({ok, 80}, Ip(":80")), - ?_assertEqual({error, bad_host_port}, Host(":80")), ?_assertEqual({error, bad_ip_port}, Ip("localhost:80")), - ?_assertEqual({ok, {"localhost", 80}}, Host("localhost:80")), - ?_assertEqual({ok, {"example.com", 80}}, Host("example.com:80")), ?_assertEqual({ok, {{127, 0, 0, 1}, 80}}, Ip("127.0.0.1:80")), ?_assertEqual({error, bad_ip_port}, Ip("$:1900")), - ?_assertEqual({error, bad_hostname}, Host("$:1900")), ?_assertMatch({ok, {_, 1883}}, Ip("[::1]:1883")), ?_assertMatch({ok, {_, 1883}}, Ip("::1:1883")), ?_assertMatch({ok, {_, 1883}}, Ip(":::1883")) ]. + +-define(T(CASE, EXPR), {CASE, fun() -> EXPR end}). + +parse_server_test_() -> + DefaultPort = ?LINE, + DefaultOpts = #{default_port => DefaultPort}, + Parse2 = fun(Value0, Opts) -> + Value = emqx_schema:convert_servers(Value0), + Validator = emqx_schema:servers_validator(Opts, _Required = true), + try + Result = emqx_schema:parse_servers(Value, Opts), + ?assertEqual(ok, Validator(Value)), + Result + catch + throw:Throw -> + %% assert validator throws the same exception + ?assertThrow(Throw, Validator(Value)), + %% and then let the test code validate the exception + throw(Throw) + end + end, + Parse = fun(Value) -> Parse2(Value, DefaultOpts) end, + HoconParse = fun(Str0) -> + {ok, Map} = hocon:binary(Str0), + Str = emqx_schema:convert_servers(Map), + Parse(Str) + end, + [ + ?T( + "single server, binary, no port", + ?assertEqual( + [{"localhost", DefaultPort}], + Parse(<<"localhost">>) + ) + ), + ?T( + "single server, string, no port", + ?assertEqual( + [{"localhost", DefaultPort}], + Parse("localhost") + ) + ), + ?T( + "single server, list(string), no port", + ?assertEqual( + [{"localhost", DefaultPort}], + Parse(["localhost"]) + ) + ), + ?T( + "single server, list(binary), no port", + ?assertEqual( + [{"localhost", DefaultPort}], + Parse([<<"localhost">>]) + ) + ), + ?T( + "single server, binary, with port", + ?assertEqual( + [{"localhost", 9999}], + Parse(<<"localhost:9999">>) + ) + ), + ?T( + "single server, list(string), with port", + ?assertEqual( + [{"localhost", 9999}], + Parse(["localhost:9999"]) + ) + ), + ?T( + "single server, string, with port", + ?assertEqual( + [{"localhost", 9999}], + Parse("localhost:9999") + ) + ), + ?T( + "single server, list(binary), with port", + ?assertEqual( + [{"localhost", 9999}], + Parse([<<"localhost:9999">>]) + ) + ), + ?T( + "multiple servers, string, no port", + ?assertEqual( + [{"host1", DefaultPort}, {"host2", DefaultPort}], + Parse("host1, host2") + ) + ), + ?T( + "multiple servers, binary, no port", + ?assertEqual( + [{"host1", DefaultPort}, {"host2", DefaultPort}], + Parse(<<"host1, host2,,,">>) + ) + ), + ?T( + "multiple servers, list(string), no port", + ?assertEqual( + [{"host1", DefaultPort}, {"host2", DefaultPort}], + Parse(["host1", "host2"]) + ) + ), + ?T( + "multiple servers, list(binary), no port", + ?assertEqual( + [{"host1", DefaultPort}, {"host2", DefaultPort}], + Parse([<<"host1">>, <<"host2">>]) + ) + ), + ?T( + "multiple servers, string, with port", + ?assertEqual( + [{"host1", 1234}, {"host2", 2345}], + Parse("host1:1234, host2:2345") + ) + ), + ?T( + "multiple servers, binary, with port", + ?assertEqual( + [{"host1", 1234}, {"host2", 2345}], + Parse(<<"host1:1234, host2:2345, ">>) + ) + ), + ?T( + "multiple servers, list(string), with port", + ?assertEqual( + [{"host1", 1234}, {"host2", 2345}], + Parse([" host1:1234 ", "host2:2345"]) + ) + ), + ?T( + "multiple servers, list(binary), with port", + ?assertEqual( + [{"host1", 1234}, {"host2", 2345}], + Parse([<<"host1:1234">>, <<"host2:2345">>]) + ) + ), + ?T( + "unexpected multiple servers", + ?assertThrow( + "expecting_one_host_but_got: 2", + emqx_schema:parse_server(<<"host1:1234, host2:1234">>, #{default_port => 1}) + ) + ), + ?T( + "multiple servers without ports invalid string list", + ?assertThrow( + "hostname_has_space", + Parse2(["host1 host2"], #{no_port => true}) + ) + ), + ?T( + "multiple servers without ports invalid binary list", + ?assertThrow( + "hostname_has_space", + Parse2([<<"host1 host2">>], #{no_port => true}) + ) + ), + ?T( + "multiple servers wihtout port, mixed list(binary|string)", + ?assertEqual( + ["host1", "host2"], + Parse2([<<"host1">>, "host2"], #{no_port => true}) + ) + ), + ?T( + "no default port, missing port number in config", + ?assertThrow( + "missing_port_number", + emqx_schema:parse_server(<<"a">>, #{}) + ) + ), + ?T( + "empty binary string", + ?assertEqual( + undefined, + emqx_schema:parse_server(<<>>, #{no_port => true}) + ) + ), + ?T( + "empty array", + ?assertEqual( + undefined, + emqx_schema:parse_servers([], #{no_port => true}) + ) + ), + ?T( + "empty binary array", + ?assertThrow( + "bad_host_port", + emqx_schema:parse_servers([<<>>], #{no_port => true}) + ) + ), + ?T( + "HOCON value undefined", + ?assertEqual( + undefined, + emqx_schema:parse_server(undefined, #{no_port => true}) + ) + ), + ?T( + "single server map", + ?assertEqual( + [{"host1.domain", 1234}], + HoconParse("host1.domain:1234") + ) + ), + ?T( + "multiple servers map", + ?assertEqual( + [{"host1.domain", 1234}, {"host2.domain", 2345}, {"host3.domain", 3456}], + HoconParse("host1.domain:1234,host2.domain:2345,host3.domain:3456") + ) + ), + ?T( + "no port expected valid port", + ?assertThrow( + "not_expecting_port_number", + emqx_schema:parse_server("localhost:80", #{no_port => true}) + ) + ), + ?T( + "no port expected invalid port", + ?assertThrow( + "not_expecting_port_number", + emqx_schema:parse_server("localhost:notaport", #{no_port => true}) + ) + ), + + ?T( + "bad hostname", + ?assertThrow( + "expecting_hostname_but_got_a_number", + emqx_schema:parse_server(":80", #{default_port => 80}) + ) + ), + ?T( + "bad port", + ?assertThrow( + "bad_port_number", + emqx_schema:parse_server("host:33x", #{default_port => 33}) + ) + ), + ?T( + "bad host with port", + ?assertThrow( + "bad_host_port", + emqx_schema:parse_server("host:name:80", #{default_port => 80}) + ) + ), + ?T( + "bad schema", + ?assertError( + "bad_schema", + emqx_schema:parse_server("whatever", #{default_port => 10, no_port => true}) + ) + ) + ]. + +servers_validator_test() -> + Required = emqx_schema:servers_validator(#{}, true), + NotRequired = emqx_schema:servers_validator(#{}, false), + ?assertThrow("cannot_be_empty", Required("")), + ?assertThrow("cannot_be_empty", Required(<<>>)), + ?assertThrow("cannot_be_empty", Required(undefined)), + ?assertEqual(ok, NotRequired("")), + ?assertEqual(ok, NotRequired(<<>>)), + ?assertEqual(ok, NotRequired(undefined)), + ok. + +converter_invalid_input_test() -> + ?assertEqual(undefined, emqx_schema:convert_servers(undefined)), + %% 'foo: bar' is a valid HOCON value, but 'bar' is not a port number + ?assertThrow("bad_host_port", emqx_schema:convert_servers(#{foo => bar})). diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl index 2f84b7b90..e4e838d04 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl @@ -100,7 +100,6 @@ t_create_invalid(_Config) -> InvalidConfigs = [ - maps:without([<<"server">>], AuthConfig), AuthConfig#{<<"server">> => <<"unknownhost:3333">>}, AuthConfig#{<<"password">> => <<"wrongpass">>}, AuthConfig#{<<"database">> => <<"wrongdatabase">>} @@ -541,7 +540,7 @@ mysql_config() -> username => <<"root">>, password => <<"public">>, pool_size => 8, - server => {?MYSQL_HOST, ?MYSQL_DEFAULT_PORT}, + server => <>, ssl => #{enable => false} }. diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl index a0fbefb01..267f6aa56 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl @@ -32,7 +32,11 @@ -define(PATH, [authentication]). all() -> - [{group, require_seeds}, t_create_invalid]. + [ + {group, require_seeds}, + t_update_with_invalid_config, + t_update_with_bad_config_value + ]. groups() -> [{require_seeds, [], [t_create, t_authenticate, t_update, t_destroy, t_is_superuser]}]. @@ -96,12 +100,36 @@ t_create(_Config) -> {ok, [#{provider := emqx_authn_pgsql}]} = emqx_authentication:list_authenticators(?GLOBAL), emqx_authn_test_lib:delete_config(?ResourceID). -t_create_invalid(_Config) -> +%% invalid config which does not pass the schema check should result in an error +t_update_with_invalid_config(_Config) -> + AuthConfig = raw_pgsql_auth_config(), + BadConfig = maps:without([<<"server">>], AuthConfig), + ?assertMatch( + {error, + {bad_authenticator_config, #{ + reason := + {emqx_authn_pgsql, [ + #{ + kind := validation_error, + path := "authentication.server", + reason := required_field, + value := undefined + } + ]} + }}}, + emqx:update_config( + ?PATH, + {create_authenticator, ?GLOBAL, BadConfig} + ) + ), + ok. + +%% bad config values may cause connection failure, but should still be able to update +t_update_with_bad_config_value(_Config) -> AuthConfig = raw_pgsql_auth_config(), InvalidConfigs = [ - maps:without([<<"server">>], AuthConfig), AuthConfig#{<<"server">> => <<"unknownhost:3333">>}, AuthConfig#{<<"password">> => <<"wrongpass">>}, AuthConfig#{<<"database">> => <<"wrongdatabase">>} @@ -591,7 +619,7 @@ pgsql_config() -> username => <<"root">>, password => <<"public">>, pool_size => 8, - server => {?PGSQL_HOST, ?PGSQL_DEFAULT_PORT}, + server => pgsql_server(), ssl => #{enable => false} }. diff --git a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl index 7f4726dda..c7f81123b 100644 --- a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl @@ -31,7 +31,12 @@ -define(ResourceID, <<"password_based:redis">>). all() -> - [{group, require_seeds}, t_create, t_create_invalid]. + [ + {group, require_seeds}, + t_create, + t_create_with_config_values_wont_work, + t_create_invalid_config + ]. groups() -> [{require_seeds, [], [t_authenticate, t_update, t_destroy]}]. @@ -97,7 +102,7 @@ t_create(_Config) -> {ok, [#{provider := emqx_authn_redis}]} = emqx_authentication:list_authenticators(?GLOBAL). -t_create_invalid(_Config) -> +t_create_with_config_values_wont_work(_Config) -> AuthConfig = raw_redis_auth_config(), InvalidConfigs = [ @@ -131,7 +136,6 @@ t_create_invalid(_Config) -> InvalidConfigs1 = [ - maps:without([<<"server">>], AuthConfig), AuthConfig#{<<"server">> => <<"unknownhost:3333">>}, AuthConfig#{<<"password">> => <<"wrongpass">>}, AuthConfig#{<<"database">> => <<"5678">>} @@ -152,6 +156,22 @@ t_create_invalid(_Config) -> InvalidConfigs1 ). +t_create_invalid_config(_Config) -> + Config0 = raw_redis_auth_config(), + Config = maps:without([<<"server">>], Config0), + ?assertMatch( + {error, + {bad_authenticator_config, #{ + reason := {emqx_authn_redis, [#{kind := validation_error}]} + }}}, + emqx:update_config(?PATH, {create_authenticator, ?GLOBAL, Config}) + ), + ?assertMatch([], emqx_config:get_raw([authentication])), + ?assertEqual( + {error, {not_found, {chain, ?GLOBAL}}}, + emqx_authentication:list_authenticators(?GLOBAL) + ). + t_authenticate(_Config) -> ok = lists:foreach( fun(Sample) -> @@ -591,7 +611,7 @@ redis_config() -> pool_size => 8, redis_type => single, password => "public", - server => {?REDIS_HOST, ?REDIS_DEFAULT_PORT}, + server => <>, ssl => #{enable => false} }. diff --git a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl index e1acfd771..ede849dd5 100644 --- a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl @@ -317,8 +317,7 @@ raw_mysql_authz_config() -> "SELECT permission, action, topic " "FROM acl WHERE username = ${username}" >>, - - <<"server">> => mysql_server() + <<"server">> => <> }. q(Sql) -> @@ -385,9 +384,6 @@ setup_config(SpecialParams) -> SpecialParams ). -mysql_server() -> - iolist_to_binary(io_lib:format("~s", [?MYSQL_HOST])). - mysql_config() -> #{ auto_reconnect => true, @@ -395,7 +391,7 @@ mysql_config() -> username => <<"root">>, password => <<"public">>, pool_size => 8, - server => {?MYSQL_HOST, ?MYSQL_DEFAULT_PORT}, + server => <>, ssl => #{enable => false} }. diff --git a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl index 7ed19716f..42362d247 100644 --- a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl @@ -322,8 +322,7 @@ raw_pgsql_authz_config() -> "SELECT permission, action, topic " "FROM acl WHERE username = ${username}" >>, - - <<"server">> => pgsql_server() + <<"server">> => <> }. q(Sql) -> @@ -393,9 +392,6 @@ setup_config(SpecialParams) -> SpecialParams ). -pgsql_server() -> - iolist_to_binary(io_lib:format("~s", [?PGSQL_HOST])). - pgsql_config() -> #{ auto_reconnect => true, @@ -403,7 +399,7 @@ pgsql_config() -> username => <<"root">>, password => <<"public">>, pool_size => 8, - server => {?PGSQL_HOST, ?PGSQL_DEFAULT_PORT}, + server => <>, ssl => #{enable => false} }. diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl index 54d7a406c..7a4f799ee 100644 --- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl @@ -163,7 +163,7 @@ t_lookups(_Config) -> %% should still succeed to create even if the config will not work, %% because it's not a part of the schema check -t_create_with_config_values_wont_works(_Config) -> +t_create_with_config_values_wont_work(_Config) -> AuthzConfig = raw_redis_authz_config(), InvalidConfigs = @@ -182,11 +182,15 @@ t_create_with_config_values_wont_works(_Config) -> ). %% creating without a require filed should return error -t_create_invalid_schema(_Config) -> +t_create_invalid_config(_Config) -> AuthzConfig = raw_redis_authz_config(), C = maps:without([<<"server">>], AuthzConfig), ?assertMatch( - {error, {emqx_conf_schema, _}}, + {error, #{ + kind := validation_error, + path := "authorization.sources.1", + discarded_errors_count := 0 + }}, emqx_authz:update(?CMD_REPLACE, [C]) ). @@ -255,12 +259,9 @@ raw_redis_authz_config() -> <<"cmd">> => <<"HGETALL mqtt_user:${username}">>, <<"database">> => <<"1">>, <<"password">> => <<"public">>, - <<"server">> => redis_server() + <<"server">> => <> }. -redis_server() -> - iolist_to_binary(io_lib:format("~s", [?REDIS_HOST])). - q(Command) -> emqx_resource:query( ?REDIS_RESOURCE, @@ -274,7 +275,7 @@ redis_config() -> pool_size => 8, redis_type => single, password => "public", - server => {?REDIS_HOST, ?REDIS_DEFAULT_PORT}, + server => <>, ssl => #{enable => false} }. diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index 2a379dbe4..65ef49c6b 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,6 +1,6 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ - {description, "An OTP application"}, + {description, "EMQX Data Integration Connectors"}, {vsn, "0.1.11"}, {registered, []}, {mod, {emqx_connector_app, []}}, diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index d53c0e41b..117d7857c 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -35,6 +35,11 @@ -export([connect/1]). -export([search/4]). + +%% port is not expected from configuration because +%% all servers expected to use the same port number +-define(LDAP_HOST_OPTIONS, #{no_port => true}). + %%===================================================================== roots() -> ldap_fields() ++ emqx_connector_schema_lib:ssl_fields(). @@ -63,12 +68,7 @@ on_start( connector => InstId, config => Config }), - Servers = [ - begin - proplists:get_value(host, S) - end - || S <- Servers0 - ], + Servers = emqx_schema:parse_servers(Servers0, ?LDAP_HOST_OPTIONS), SslOpts = case maps:get(enable, SSL) of true -> @@ -86,8 +86,7 @@ on_start( {bind_password, BindPassword}, {timeout, Timeout}, {pool_size, PoolSize}, - {auto_reconnect, reconn_interval(AutoReconn)}, - {servers, Servers} + {auto_reconnect, reconn_interval(AutoReconn)} ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of @@ -166,7 +165,7 @@ connect(Opts) -> ldap_fields() -> [ - {servers, fun servers/1}, + {servers, servers()}, {port, fun port/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1}, {bind_dn, fun bind_dn/1}, @@ -175,11 +174,8 @@ ldap_fields() -> {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} ]. -servers(type) -> list(); -servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")]; -servers(converter) -> fun to_servers_raw/1; -servers(required) -> true; -servers(_) -> undefined. +servers() -> + emqx_schema:servers_sc(#{}, ?LDAP_HOST_OPTIONS). bind_dn(type) -> binary(); bind_dn(default) -> 0; @@ -191,24 +187,3 @@ port(_) -> undefined. duration(type) -> emqx_schema:duration_ms(); duration(_) -> undefined. - -to_servers_raw(Servers) -> - {ok, - lists:map( - fun(Server) -> - case string:tokens(Server, ": ") of - [Ip] -> - [{host, Ip}]; - [Ip, Port] -> - [{host, Ip}, {port, list_to_integer(Port)}] - end - end, - string:tokens(str(Servers), ", ") - )}. - -str(A) when is_atom(A) -> - atom_to_list(A); -str(B) when is_binary(B) -> - binary_to_list(B); -str(S) when is_list(S) -> - S. diff --git a/apps/emqx_connector/src/emqx_connector_lib.erl b/apps/emqx_connector/src/emqx_connector_lib.erl new file mode 100644 index 000000000..01104ea27 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_lib.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector_lib). + +-export([resolve_dns/2]). + +%% @doc Mostly for meck. +resolve_dns(DNS, Type) -> + inet_res:lookup(DNS, in, Type). diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 678a4f847..61f672383 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -39,18 +39,16 @@ -export([mongo_query/5, mongo_insert/3, check_worker_health/1]). +%% for testing +-export([maybe_resolve_srv_and_txt_records/1]). + -define(HEALTH_CHECK_TIMEOUT, 30000). %% mongo servers don't need parse -define(MONGO_HOST_OPTIONS, #{ - host_type => hostname, default_port => ?MONGO_DEFAULT_PORT }). --ifdef(TEST). --export([to_servers_raw/1]). --endif. - %%===================================================================== roots() -> [ @@ -73,7 +71,7 @@ fields(single) -> required => true, desc => ?DESC("single_mongo_type") }}, - {server, fun server/1}, + {server, server()}, {w_mode, fun w_mode/1} ] ++ mongo_fields(); fields(rs) -> @@ -84,7 +82,7 @@ fields(rs) -> required => true, desc => ?DESC("rs_mongo_type") }}, - {servers, fun servers/1}, + {servers, servers()}, {w_mode, fun w_mode/1}, {r_mode, fun r_mode/1}, {replica_set_name, fun replica_set_name/1} @@ -97,7 +95,7 @@ fields(sharded) -> required => true, desc => ?DESC("sharded_mongo_type") }}, - {servers, fun servers/1}, + {servers, servers()}, {w_mode, fun w_mode/1} ] ++ mongo_fields(); fields(topology) -> @@ -161,7 +159,7 @@ on_start( sharded -> "starting_mongodb_sharded_connector" end, ?SLOG(info, #{msg => Msg, connector => InstId, config => Config}), - NConfig = #{hosts := Hosts} = may_parse_srv_and_txt_records(Config), + NConfig = #{hosts := Hosts} = maybe_resolve_srv_and_txt_records(Config), SslOpts = case maps:get(enable, SSL) of true -> @@ -387,19 +385,13 @@ init_worker_options([], Acc) -> %% =================================================================== %% Schema funcs -server(type) -> emqx_schema:host_port(); -server(required) -> true; -server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")]; -server(converter) -> fun to_server_raw/1; -server(desc) -> ?DESC("server"); -server(_) -> undefined. +server() -> + Meta = #{desc => ?DESC("server")}, + emqx_schema:servers_sc(Meta, ?MONGO_HOST_OPTIONS). -servers(type) -> list(); -servers(required) -> true; -servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")]; -servers(converter) -> fun to_servers_raw/1; -servers(desc) -> ?DESC("servers"); -servers(_) -> undefined. +servers() -> + Meta = #{desc => ?DESC("servers")}, + emqx_schema:servers_sc(Meta, ?MONGO_HOST_OPTIONS). w_mode(type) -> hoconsc:enum([unsafe, safe]); w_mode(desc) -> ?DESC("w_mode"); @@ -434,163 +426,109 @@ srv_record(_) -> undefined. %% =================================================================== %% Internal funcs -may_parse_srv_and_txt_records(#{server := Server} = Config) -> +maybe_resolve_srv_and_txt_records(#{server := Server} = Config) -> NConfig = maps:remove(server, Config), - may_parse_srv_and_txt_records_(NConfig#{servers => [Server]}); -may_parse_srv_and_txt_records(Config) -> - may_parse_srv_and_txt_records_(Config). + maybe_resolve_srv_and_txt_records1(Server, NConfig); +maybe_resolve_srv_and_txt_records(#{servers := Servers} = Config) -> + NConfig = maps:remove(servers, Config), + maybe_resolve_srv_and_txt_records1(Servers, NConfig). -may_parse_srv_and_txt_records_( +maybe_resolve_srv_and_txt_records1( + Servers0, #{ mongo_type := Type, - srv_record := false, - servers := Servers + srv_record := false } = Config ) -> case Type =:= rs andalso maps:is_key(replica_set_name, Config) =:= false of true -> - error({missing_parameter, replica_set_name}); + throw(#{ + reason => "missing_parameter", + param => replica_set_name + }); false -> - Config#{hosts => servers_to_bin(lists:flatten(Servers))} + Servers = parse_servers(Servers0), + Config#{hosts => format_hosts(Servers)} end; -may_parse_srv_and_txt_records_( +maybe_resolve_srv_and_txt_records1( + Servers, #{ mongo_type := Type, - srv_record := true, - servers := Servers + srv_record := true } = Config ) -> - Hosts = parse_srv_records(Type, Servers), - ExtraOpts = parse_txt_records(Type, Servers), + %% when srv is in use, it's typically only one DNS resolution needed, + %% however, by the schema definition, it's allowed to configure more than one. + %% here we keep only the fist + [{DNS, _IgnorePort} | _] = parse_servers(Servers), + DnsRecords = resolve_srv_records(DNS), + Hosts = format_hosts(DnsRecords), + ?tp(info, resolved_srv_records, #{dns => DNS, resolved_hosts => Hosts}), + ExtraOpts = resolve_txt_records(Type, DNS), + ?tp(info, resolved_txt_records, #{dns => DNS, resolved_options => ExtraOpts}), maps:merge(Config#{hosts => Hosts}, ExtraOpts). -parse_srv_records(Type, Servers) -> - Fun = fun(AccIn, {IpOrHost, _Port}) -> - case - inet_res:lookup( - "_mongodb._tcp." ++ - ip_or_host_to_string(IpOrHost), - in, - srv - ) - of - [] -> - error(service_not_found); - Services -> - [ - [server_to_bin({Host, Port}) || {_, _, Port, Host} <- Services] - | AccIn - ] - end - end, - Res = lists:foldl(Fun, [], Servers), - case Type of - single -> lists:nth(1, Res); - _ -> Res +resolve_srv_records(DNS0) -> + DNS = "_mongodb._tcp." ++ DNS0, + DnsData = emqx_connector_lib:resolve_dns(DNS, srv), + case [{Host, Port} || {_, _, Port, Host} <- DnsData] of + [] -> + throw(#{ + reason => "failed_to_resolve_srv_record", + dns => DNS + }); + L -> + L end. -parse_txt_records(Type, Servers) -> - Fields = - case Type of - rs -> ["authSource", "replicaSet"]; - _ -> ["authSource"] - end, - Fun = fun(AccIn, {IpOrHost, _Port}) -> - case inet_res:lookup(IpOrHost, in, txt) of - [] -> - #{}; - [[QueryString]] -> - case uri_string:dissect_query(QueryString) of - {error, _, _} -> - error({invalid_txt_record, invalid_query_string}); - Options -> - maps:merge(AccIn, take_and_convert(Fields, Options)) - end; - _ -> - error({invalid_txt_record, multiple_records}) - end - end, - lists:foldl(Fun, #{}, Servers). +resolve_txt_records(Type, DNS) -> + case emqx_connector_lib:resolve_dns(DNS, txt) of + [] -> + #{}; + [[QueryString]] = L -> + %% e.g. "authSource=admin&replicaSet=atlas-wrnled-shard-0" + case uri_string:dissect_query(QueryString) of + {error, _, _} -> + throw(#{ + reason => "bad_txt_record_resolution", + resolved => L + }); + Options -> + convert_options(Type, normalize_options(Options)) + end; + L -> + throw(#{ + reason => "multiple_txt_records", + resolved => L + }) + end. -take_and_convert(Fields, Options) -> - take_and_convert(Fields, Options, #{}). +normalize_options([]) -> + []; +normalize_options([{Name, Value} | Options]) -> + [{string:lowercase(Name), Value} | normalize_options(Options)]. -take_and_convert([], [_ | _], _Acc) -> - error({invalid_txt_record, invalid_option}); -take_and_convert([], [], Acc) -> - Acc; -take_and_convert([Field | More], Options, Acc) -> - case lists:keytake(Field, 1, Options) of - {value, {"authSource", V}, NOptions} -> - take_and_convert(More, NOptions, Acc#{auth_source => list_to_binary(V)}); - {value, {"replicaSet", V}, NOptions} -> - take_and_convert(More, NOptions, Acc#{replica_set_name => list_to_binary(V)}); - {value, _, _} -> - error({invalid_txt_record, invalid_option}); +convert_options(rs, Options) -> + M1 = maybe_add_option(auth_source, "authSource", Options), + M2 = maybe_add_option(replica_set_name, "replicaSet", Options), + maps:merge(M1, M2); +convert_options(_, Options) -> + maybe_add_option(auth_source, "authSource", Options). + +maybe_add_option(ConfigKey, OptName0, Options) -> + OptName = string:lowercase(OptName0), + case lists:keyfind(OptName, 1, Options) of + {_, OptValue} -> + #{ConfigKey => iolist_to_binary(OptValue)}; false -> - take_and_convert(More, Options, Acc) + #{} end. --spec ip_or_host_to_string(binary() | string() | tuple()) -> - string(). -ip_or_host_to_string(Ip) when is_tuple(Ip) -> - inet:ntoa(Ip); -ip_or_host_to_string(Host) -> - str(Host). +format_host({Host, Port}) -> + iolist_to_binary([Host, ":", integer_to_list(Port)]). -servers_to_bin([Server | Rest]) -> - [server_to_bin(Server) | servers_to_bin(Rest)]; -servers_to_bin([]) -> - []. +format_hosts(Hosts) -> + lists:map(fun format_host/1, Hosts). -server_to_bin({IpOrHost, Port}) -> - iolist_to_binary(ip_or_host_to_string(IpOrHost) ++ ":" ++ integer_to_list(Port)). - -%% =================================================================== -%% typereflt funcs - --spec to_server_raw(string()) -> - {string(), pos_integer()}. -to_server_raw(Server) -> - emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS). - --spec to_servers_raw(string()) -> - [{string(), pos_integer()}]. -to_servers_raw(Servers) -> - lists:map( - fun(Server) -> - emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS) - end, - split_servers(Servers) - ). - -split_servers(L) when is_list(L) -> - PossibleTypes = [ - list(binary()), - list(string()), - string() - ], - TypeChecks = lists:map(fun(T) -> typerefl:typecheck(T, L) end, PossibleTypes), - case TypeChecks of - [ok, _, _] -> - %% list(binary()) - lists:map(fun binary_to_list/1, L); - [_, ok, _] -> - %% list(string()) - L; - [_, _, ok] -> - %% string() - string:tokens(L, ", "); - [_, _, _] -> - %% invalid input - throw("List of servers must contain only strings") - end; -split_servers(B) when is_binary(B) -> - string:tokens(str(B), ", "). - -str(A) when is_atom(A) -> - atom_to_list(A); -str(B) when is_binary(B) -> - binary_to_list(B); -str(S) when is_list(S) -> - S. +parse_servers(HoconValue) -> + emqx_schema:parse_servers(HoconValue, ?MONGO_HOST_OPTIONS). diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 634968b09..12af239f7 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -43,7 +43,6 @@ -export([do_get_status/1]). -define(MYSQL_HOST_OPTIONS, #{ - host_type => inet_addr, default_port => ?MYSQL_DEFAULT_PORT }). @@ -66,17 +65,14 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. fields(config) -> - [{server, fun server/1}] ++ + [{server, server()}] ++ emqx_connector_schema_lib:relational_db_fields() ++ emqx_connector_schema_lib:ssl_fields() ++ emqx_connector_schema_lib:prepare_statement_fields(). -server(type) -> emqx_schema:host_port(); -server(required) -> true; -server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")]; -server(converter) -> fun to_server/1; -server(desc) -> ?DESC("server"); -server(_) -> undefined. +server() -> + Meta = #{desc => ?DESC("server")}, + emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS). %% =================================================================== callback_mode() -> always_sync. @@ -85,7 +81,7 @@ callback_mode() -> always_sync. on_start( InstId, #{ - server := {Host, Port}, + server := Server, database := DB, username := User, password := Password, @@ -94,6 +90,7 @@ on_start( ssl := SSL } = Config ) -> + {Host, Port} = emqx_schema:parse_server(Server, ?MYSQL_HOST_OPTIONS), ?SLOG(info, #{ msg => "starting_mysql_connector", connector => InstId, @@ -239,11 +236,6 @@ reconn_interval(false) -> false. connect(Options) -> mysql:start_link(Options). --spec to_server(string()) -> - {inet:ip_address() | inet:hostname(), pos_integer()}. -to_server(Str) -> - emqx_connector_schema_lib:parse_server(Str, ?MYSQL_HOST_OPTIONS). - init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) -> case maps:size(Prepares) of 0 -> diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 71dd2bbeb..569c8b640 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -44,7 +44,6 @@ -export([do_get_status/1]). -define(PGSQL_HOST_OPTIONS, #{ - host_type => inet_addr, default_port => ?PGSQL_DEFAULT_PORT }). @@ -54,17 +53,14 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. fields(config) -> - [{server, fun server/1}] ++ + [{server, server()}] ++ emqx_connector_schema_lib:relational_db_fields() ++ emqx_connector_schema_lib:ssl_fields() ++ emqx_connector_schema_lib:prepare_statement_fields(). -server(type) -> emqx_schema:host_port(); -server(required) -> true; -server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")]; -server(converter) -> fun to_server/1; -server(desc) -> ?DESC("server"); -server(_) -> undefined. +server() -> + Meta = #{desc => ?DESC("server")}, + emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS). %% =================================================================== callback_mode() -> always_sync. @@ -72,7 +68,7 @@ callback_mode() -> always_sync. on_start( InstId, #{ - server := {Host, Port}, + server := Server, database := DB, username := User, password := Password, @@ -81,6 +77,7 @@ on_start( ssl := SSL } = Config ) -> + {Host, Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS), ?SLOG(info, #{ msg => "starting_postgresql_connector", connector => InstId, @@ -209,11 +206,3 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) -> conn_opts(Opts, [Opt | Acc]); conn_opts([_Opt | Opts], Acc) -> conn_opts(Opts, Acc). - -%% =================================================================== -%% typereflt funcs - --spec to_server(string()) -> - {inet:ip_address() | inet:hostname(), pos_integer()}. -to_server(Str) -> - emqx_connector_schema_lib:parse_server(Str, ?PGSQL_HOST_OPTIONS). diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 5a77ba6ab..ab7572a64 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -41,7 +41,6 @@ %% redis host don't need parse -define(REDIS_HOST_OPTIONS, #{ - host_type => hostname, default_port => ?REDIS_DEFAULT_PORT }). @@ -61,7 +60,7 @@ roots() -> fields(single) -> [ - {server, fun server/1}, + {server, server()}, {redis_type, #{ type => single, default => single, @@ -73,7 +72,7 @@ fields(single) -> emqx_connector_schema_lib:ssl_fields(); fields(cluster) -> [ - {servers, fun servers/1}, + {servers, servers()}, {redis_type, #{ type => cluster, default => cluster, @@ -85,7 +84,7 @@ fields(cluster) -> emqx_connector_schema_lib:ssl_fields(); fields(sentinel) -> [ - {servers, fun servers/1}, + {servers, servers()}, {redis_type, #{ type => sentinel, default => sentinel, @@ -101,21 +100,16 @@ fields(sentinel) -> redis_fields() ++ emqx_connector_schema_lib:ssl_fields(). -server(type) -> emqx_schema:host_port(); -server(required) -> true; -server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")]; -server(converter) -> fun to_server_raw/1; -server(desc) -> ?DESC("server"); -server(_) -> undefined. +server() -> + Meta = #{desc => ?DESC("server")}, + emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS). -servers(type) -> list(); -servers(required) -> true; -servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")]; -servers(converter) -> fun to_servers_raw/1; -servers(desc) -> ?DESC("servers"); -servers(_) -> undefined. +servers() -> + Meta = #{desc => ?DESC("servers")}, + emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS). %% =================================================================== + callback_mode() -> always_sync. on_start( @@ -132,11 +126,13 @@ on_start( connector => InstId, config => Config }), - Servers = + ConfKey = case Type of - single -> [{servers, [maps:get(server, Config)]}]; - _ -> [{servers, maps:get(servers, Config)}] + single -> server; + _ -> servers end, + Servers0 = maps:get(ConfKey, Config), + Servers = [{servers, emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS)}], Database = case Type of cluster -> []; @@ -299,25 +295,3 @@ redis_fields() -> }}, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} ]. - --spec to_server_raw(string()) -> - {string(), pos_integer()}. -to_server_raw(Server) -> - emqx_connector_schema_lib:parse_server(Server, ?REDIS_HOST_OPTIONS). - --spec to_servers_raw(string()) -> - [{string(), pos_integer()}]. -to_servers_raw(Servers) -> - lists:map( - fun(Server) -> - emqx_connector_schema_lib:parse_server(Server, ?REDIS_HOST_OPTIONS) - end, - string:tokens(str(Servers), ", ") - ). - -str(A) when is_atom(A) -> - atom_to_list(A); -str(B) when is_binary(B) -> - binary_to_list(B); -str(S) when is_list(S) -> - S. diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index 7bcfb6d21..b0fcf9139 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -25,10 +25,6 @@ prepare_statement_fields/0 ]). --export([ - parse_server/2 -]). - -export([ pool_size/1, database/1, @@ -111,53 +107,3 @@ auto_reconnect(type) -> boolean(); auto_reconnect(desc) -> ?DESC("auto_reconnect"); auto_reconnect(default) -> true; auto_reconnect(_) -> undefined. - -parse_server(Str, #{host_type := inet_addr, default_port := DefaultPort}) -> - case string:tokens(str(Str), ": ") of - [Ip, Port] -> - {parse_ip(Ip), parse_port(Port)}; - [Ip] -> - {parse_ip(Ip), DefaultPort}; - _ -> - throw("Bad server schema") - end; -parse_server(Str, #{host_type := hostname, default_port := DefaultPort}) -> - case string:tokens(str(Str), ": ") of - [Hostname, Port] -> - {Hostname, parse_port(Port)}; - [Hostname] -> - {Hostname, DefaultPort}; - _ -> - throw("Bad server schema") - end; -parse_server(_, _) -> - throw("Invalid Host"). - -parse_ip(Str) -> - case inet:parse_address(Str) of - {ok, R} -> - R; - _ -> - %% check is a rfc1035's hostname - case inet_parse:domain(Str) of - true -> - Str; - _ -> - throw("Bad IP or Host") - end - end. - -parse_port(Port) -> - try - list_to_integer(Port) - catch - _:_ -> - throw("Bad port number") - end. - -str(A) when is_atom(A) -> - atom_to_list(A); -str(B) when is_binary(B) -> - binary_to_list(B); -str(S) when is_list(S) -> - S. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index f1ecbf68c..ed372e196 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -51,15 +51,15 @@ start(Config) -> Parent = self(), - {Host, Port} = maps:get(server, Config), + ServerStr = iolist_to_binary(maps:get(server, Config)), + {Server, Port} = emqx_connector_mqtt_schema:parse_server(ServerStr), Mountpoint = maps:get(receive_mountpoint, Config, undefined), Subscriptions = maps:get(subscriptions, Config, undefined), Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions), - ServerStr = ip_port_to_server_str(Host, Port), Handlers = make_hdlr(Parent, Vars, #{server => ServerStr}), Config1 = Config#{ msg_handler => Handlers, - host => Host, + host => Server, port => Port, force_ping => true, proto_ver => maps:get(proto_ver, Config, v4) @@ -234,11 +234,3 @@ printable_maps(Headers) -> #{}, Headers ). - -ip_port_to_server_str(Host, Port) -> - HostStr = - case inet:ntoa(Host) of - {error, einval} -> Host; - IPStr -> IPStr - end, - list_to_binary(io_lib:format("~s:~w", [HostStr, Port])). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 1c9f66d21..a06ce8b47 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -25,13 +25,16 @@ namespace/0, roots/0, fields/1, - desc/1 + desc/1, + parse_server/1 ]). -import(emqx_schema, [mk_duration/2]). -import(hoconsc, [mk/2, ref/2]). +-define(MQTT_HOST_OPTS, #{default_port => 1883}). + namespace() -> "connector-mqtt". roots() -> @@ -67,14 +70,7 @@ fields("server_configs") -> desc => ?DESC("mode") } )}, - {server, - mk( - emqx_schema:host_port(), - #{ - required => true, - desc => ?DESC("server") - } - )}, + {server, emqx_schema:servers_sc(#{desc => ?DESC("server")}, ?MQTT_HOST_OPTS)}, {clientid_prefix, mk(binary(), #{required => false, desc => ?DESC("clientid_prefix")})}, {reconnect_interval, mk_duration( @@ -299,3 +295,6 @@ desc(_) -> qos() -> hoconsc:union([emqx_schema:qos(), binary()]). + +parse_server(Str) -> + emqx_schema:parse_server(Str, ?MQTT_HOST_OPTS). diff --git a/apps/emqx_connector/test/emqx_connector_mongo_tests.erl b/apps/emqx_connector/test/emqx_connector_mongo_tests.erl index 7978ed289..9a40e7f82 100644 --- a/apps/emqx_connector/test/emqx_connector_mongo_tests.erl +++ b/apps/emqx_connector/test/emqx_connector_mongo_tests.erl @@ -18,151 +18,135 @@ -include_lib("eunit/include/eunit.hrl"). --define(DEFAULT_MONGO_PORT, 27017). +srv_record_test() -> + with_dns_mock( + fun normal_dns_resolution_mock/2, + fun() -> + Single = single_config(), + Rs = simple_rs_config(), + Hosts = [ + <<"cluster0-shard-00-02.zkemc.mongodb.net:27017">>, + <<"cluster0-shard-00-01.zkemc.mongodb.net:27017">>, + <<"cluster0-shard-00-00.zkemc.mongodb.net:27017">> + ], + ?assertMatch( + #{ + hosts := Hosts, + auth_source := <<"admin">> + }, + resolve(Single) + ), + ?assertMatch( + #{ + hosts := Hosts, + auth_source := <<"admin">>, + replica_set_name := <<"atlas-wrnled-shard-0">> + }, + resolve(Rs) + ), + ok + end + ). -%%------------------------------------------------------------------------------ -%% Helper fns -%%------------------------------------------------------------------------------ +empty_srv_record_test() -> + with_dns_mock( + bad_srv_record_mock(_DnsResolution = []), + fun() -> + ?assertThrow(#{reason := "failed_to_resolve_srv_record"}, resolve(simple_rs_config())) + end + ). -%%------------------------------------------------------------------------------ -%% Test cases -%%------------------------------------------------------------------------------ +empty_txt_record_test() -> + with_dns_mock( + bad_txt_record_mock(_DnsResolution = []), + fun() -> + Config = resolve(single_config()), + ?assertNot(maps:is_key(auth_source, Config)), + ?assertNot(maps:is_key(replica_set_name, Config)), + ok + end + ). -to_servers_raw_test_() -> +multiple_txt_records_test() -> + with_dns_mock( + bad_txt_record_mock(_DnsResolution = [1, 2]), + fun() -> + ?assertThrow(#{reason := "multiple_txt_records"}, resolve(simple_rs_config())) + end + ). + +bad_query_string_test() -> + with_dns_mock( + bad_txt_record_mock(_DnsResolution = [["%-111"]]), + fun() -> + ?assertThrow(#{reason := "bad_txt_record_resolution"}, resolve(simple_rs_config())) + end + ). + +resolve(Config) -> + emqx_connector_mongo:maybe_resolve_srv_and_txt_records(Config). + +checked_config(Hocon) -> + {ok, Config} = hocon:binary(Hocon), + hocon_tconf:check_plain( + emqx_connector_mongo, + #{<<"config">> => Config}, + #{atom_key => true} + ). + +simple_rs_config() -> + #{config := Rs} = checked_config( + "mongo_type = rs\n" + "servers = \"cluster0.zkemc.mongodb.net:27017\"\n" + "srv_record = true\n" + "database = foobar\n" + "replica_set_name = configured_replicaset_name\n" + ), + Rs. + +single_config() -> + #{config := Single} = checked_config( + "mongo_type = single\n" + "server = \"cluster0.zkemc.mongodb.net:27017,cluster0.zkemc.mongodb.net:27017\"\n" + "srv_record = true\n" + "database = foobar\n" + ), + Single. + +normal_srv_resolution() -> [ - {"single server, binary, no port", - ?_test( - ?assertEqual( - [{"localhost", ?DEFAULT_MONGO_PORT}], - emqx_connector_mongo:to_servers_raw(<<"localhost">>) - ) - )}, - {"single server, string, no port", - ?_test( - ?assertEqual( - [{"localhost", ?DEFAULT_MONGO_PORT}], - emqx_connector_mongo:to_servers_raw("localhost") - ) - )}, - {"single server, list(binary), no port", - ?_test( - ?assertEqual( - [{"localhost", ?DEFAULT_MONGO_PORT}], - emqx_connector_mongo:to_servers_raw([<<"localhost">>]) - ) - )}, - {"single server, list(string), no port", - ?_test( - ?assertEqual( - [{"localhost", ?DEFAULT_MONGO_PORT}], - emqx_connector_mongo:to_servers_raw(["localhost"]) - ) - )}, - %%%%%%%%% - {"single server, binary, with port", - ?_test( - ?assertEqual( - [{"localhost", 9999}], emqx_connector_mongo:to_servers_raw(<<"localhost:9999">>) - ) - )}, - {"single server, string, with port", - ?_test( - ?assertEqual( - [{"localhost", 9999}], emqx_connector_mongo:to_servers_raw("localhost:9999") - ) - )}, - {"single server, list(binary), with port", - ?_test( - ?assertEqual( - [{"localhost", 9999}], - emqx_connector_mongo:to_servers_raw([<<"localhost:9999">>]) - ) - )}, - {"single server, list(string), with port", - ?_test( - ?assertEqual( - [{"localhost", 9999}], emqx_connector_mongo:to_servers_raw(["localhost:9999"]) - ) - )}, - %%%%%%%%% - {"multiple servers, string, no port", - ?_test( - ?assertEqual( - [{"host1", ?DEFAULT_MONGO_PORT}, {"host2", ?DEFAULT_MONGO_PORT}], - emqx_connector_mongo:to_servers_raw("host1, host2") - ) - )}, - {"multiple servers, binary, no port", - ?_test( - ?assertEqual( - [{"host1", ?DEFAULT_MONGO_PORT}, {"host2", ?DEFAULT_MONGO_PORT}], - emqx_connector_mongo:to_servers_raw(<<"host1, host2">>) - ) - )}, - {"multiple servers, list(string), no port", - ?_test( - ?assertEqual( - [{"host1", ?DEFAULT_MONGO_PORT}, {"host2", ?DEFAULT_MONGO_PORT}], - emqx_connector_mongo:to_servers_raw(["host1", "host2"]) - ) - )}, - {"multiple servers, list(binary), no port", - ?_test( - ?assertEqual( - [{"host1", ?DEFAULT_MONGO_PORT}, {"host2", ?DEFAULT_MONGO_PORT}], - emqx_connector_mongo:to_servers_raw([<<"host1">>, <<"host2">>]) - ) - )}, - %%%%%%%%% - {"multiple servers, string, with port", - ?_test( - ?assertEqual( - [{"host1", 1234}, {"host2", 2345}], - emqx_connector_mongo:to_servers_raw("host1:1234, host2:2345") - ) - )}, - {"multiple servers, binary, with port", - ?_test( - ?assertEqual( - [{"host1", 1234}, {"host2", 2345}], - emqx_connector_mongo:to_servers_raw(<<"host1:1234, host2:2345">>) - ) - )}, - {"multiple servers, list(string), with port", - ?_test( - ?assertEqual( - [{"host1", 1234}, {"host2", 2345}], - emqx_connector_mongo:to_servers_raw(["host1:1234", "host2:2345"]) - ) - )}, - {"multiple servers, list(binary), with port", - ?_test( - ?assertEqual( - [{"host1", 1234}, {"host2", 2345}], - emqx_connector_mongo:to_servers_raw([<<"host1:1234">>, <<"host2:2345">>]) - ) - )}, - %%%%%%%% - {"multiple servers, invalid list(string)", - ?_test( - ?assertThrow( - _, - emqx_connector_mongo:to_servers_raw(["host1, host2"]) - ) - )}, - {"multiple servers, invalid list(binary)", - ?_test( - ?assertThrow( - _, - emqx_connector_mongo:to_servers_raw([<<"host1, host2">>]) - ) - )}, - %% TODO: handle this case?? - {"multiple servers, mixed list(binary|string)", - ?_test( - ?assertThrow( - _, - emqx_connector_mongo:to_servers_raw([<<"host1">>, "host2"]) - ) - )} + {0, 0, 27017, "cluster0-shard-00-02.zkemc.mongodb.net"}, + {0, 0, 27017, "cluster0-shard-00-01.zkemc.mongodb.net"}, + {0, 0, 27017, "cluster0-shard-00-00.zkemc.mongodb.net"} ]. + +normal_txt_resolution() -> + [["authSource=admin&replicaSet=atlas-wrnled-shard-0"]]. + +normal_dns_resolution_mock("_mongodb._tcp.cluster0.zkemc.mongodb.net", srv) -> + normal_srv_resolution(); +normal_dns_resolution_mock("cluster0.zkemc.mongodb.net", txt) -> + normal_txt_resolution(). + +bad_srv_record_mock(DnsResolution) -> + fun("_mongodb._tcp.cluster0.zkemc.mongodb.net", srv) -> + DnsResolution + end. + +bad_txt_record_mock(DnsResolution) -> + fun + ("_mongodb._tcp.cluster0.zkemc.mongodb.net", srv) -> + normal_srv_resolution(); + ("cluster0.zkemc.mongodb.net", txt) -> + DnsResolution + end. + +with_dns_mock(MockFn, TestFn) -> + meck:new(emqx_connector_lib, [non_strict, passthrough, no_history, no_link]), + meck:expect(emqx_connector_lib, resolve_dns, MockFn), + try + TestFn() + after + meck:unload(emqx_connector_lib) + end, + ok. diff --git a/apps/emqx_connector/test/emqx_connector_mqtt_tests.erl b/apps/emqx_connector/test/emqx_connector_mqtt_tests.erl index 2bb9abd84..b5c937b55 100644 --- a/apps/emqx_connector/test/emqx_connector_mqtt_tests.erl +++ b/apps/emqx_connector/test/emqx_connector_mqtt_tests.erl @@ -50,8 +50,8 @@ send_and_ack_test() -> try Max = 1, Batch = lists:seq(1, Max), - {ok, Conn} = emqx_connector_mqtt_mod:start(#{server => {{127, 0, 0, 1}, 1883}}), - % %% return last packet id as batch reference + {ok, Conn} = emqx_connector_mqtt_mod:start(#{server => "127.0.0.1:1883"}), + %% return last packet id as batch reference {ok, _AckRef} = emqx_connector_mqtt_mod:send(Conn, Batch), ok = emqx_connector_mqtt_mod:stop(Conn) diff --git a/apps/emqx_dashboard/src/emqx_dashboard.app.src b/apps/emqx_dashboard/src/emqx_dashboard.app.src index 56bd64c74..2698d5534 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.app.src +++ b/apps/emqx_dashboard/src/emqx_dashboard.app.src @@ -2,7 +2,7 @@ {application, emqx_dashboard, [ {description, "EMQX Web Dashboard"}, % strict semver, bump manually! - {vsn, "5.0.10"}, + {vsn, "5.0.11"}, {modules, []}, {registered, [emqx_dashboard_sup]}, {applications, [kernel, stdlib, mnesia, minirest, emqx]}, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 6514ed9ef..8e0359d9b 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -675,8 +675,6 @@ typename_to_spec("file()", _Mod) -> #{type => string, example => <<"/path/to/file">>}; typename_to_spec("ip_port()", _Mod) -> #{type => string, example => <<"127.0.0.1:80">>}; -typename_to_spec("host_port()", _Mod) -> - #{type => string, example => <<"example.host.domain:80">>}; typename_to_spec("write_syntax()", _Mod) -> #{ type => string, diff --git a/apps/emqx_gateway/src/emqx_gateway.app.src b/apps/emqx_gateway/src/emqx_gateway.app.src index 6376af6ff..53403a67a 100644 --- a/apps/emqx_gateway/src/emqx_gateway.app.src +++ b/apps/emqx_gateway/src/emqx_gateway.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway, [ {description, "The Gateway management application"}, - {vsn, "0.1.9"}, + {vsn, "0.1.10"}, {registered, []}, {mod, {emqx_gateway_app, []}}, {applications, [kernel, stdlib, grpc, emqx, emqx_authn]}, diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl index ecde268ba..20f2d5467 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl @@ -18,7 +18,13 @@ -behaviour(gen_server). +-ifdef(TEST). +%% make rebar3 ct happy when testing with --suite path/to/module_SUITE.erl +-include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl"). +-else. +%% make mix happy -include("src/mqttsn/include/emqx_sn.hrl"). +-endif. -include_lib("emqx/include/logger.hrl"). -export([ diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src index 63c6c01ad..fdfd2b28f 100644 --- a/apps/emqx_machine/src/emqx_machine.app.src +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -3,7 +3,7 @@ {id, "emqx_machine"}, {description, "The EMQX Machine"}, % strict semver, bump manually! - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib]}, diff --git a/apps/emqx_statsd/include/emqx_statsd.hrl b/apps/emqx_statsd/include/emqx_statsd.hrl index 92d856670..d2f88bd6d 100644 --- a/apps/emqx_statsd/include/emqx_statsd.hrl +++ b/apps/emqx_statsd/include/emqx_statsd.hrl @@ -1,2 +1,19 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + -define(APP, emqx_statsd). -define(STATSD, [statsd]). +-define(SERVER_PARSE_OPTS, #{default_port => 8125}). diff --git a/apps/emqx_statsd/src/emqx_statsd.app.src b/apps/emqx_statsd/src/emqx_statsd.app.src index 5f32567d6..638c5a33b 100644 --- a/apps/emqx_statsd/src/emqx_statsd.app.src +++ b/apps/emqx_statsd/src/emqx_statsd.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_statsd, [ {description, "EMQX Statsd"}, - {vsn, "5.0.3"}, + {vsn, "5.0.4"}, {registered, []}, {mod, {emqx_statsd_app, []}}, {applications, [ diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 4b0a98cd3..71cba98b6 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -75,10 +75,11 @@ init([]) -> process_flag(trap_exit, true), #{ tags := TagsRaw, - server := {Host, Port}, + server := Server, sample_time_interval := SampleTimeInterval, flush_time_interval := FlushTimeInterval } = emqx_conf:get([statsd]), + {Host, Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS), Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw), Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}], {ok, Pid} = estatsd:start_link(Opts), diff --git a/apps/emqx_statsd/src/emqx_statsd_schema.erl b/apps/emqx_statsd/src/emqx_statsd_schema.erl index 3fb51f3bd..87ca22a5a 100644 --- a/apps/emqx_statsd/src/emqx_statsd_schema.erl +++ b/apps/emqx_statsd/src/emqx_statsd_schema.erl @@ -18,6 +18,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). +-include("emqx_statsd.hrl"). -behaviour(hocon_schema). @@ -44,7 +45,7 @@ fields("statsd") -> desc => ?DESC(enable) } )}, - {server, fun server/1}, + {server, server()}, {sample_time_interval, fun sample_interval/1}, {flush_time_interval, fun flush_interval/1}, {tags, fun tags/1} @@ -53,11 +54,12 @@ fields("statsd") -> desc("statsd") -> ?DESC(statsd); desc(_) -> undefined. -server(type) -> emqx_schema:host_port(); -server(required) -> true; -server(default) -> "127.0.0.1:8125"; -server(desc) -> ?DESC(?FUNCTION_NAME); -server(_) -> undefined. +server() -> + Meta = #{ + required => true, + desc => ?DESC(?FUNCTION_NAME) + }, + emqx_schema:servers_sc(Meta, ?SERVER_PARSE_OPTS). sample_interval(type) -> emqx_schema:duration_ms(); sample_interval(required) -> true; diff --git a/changes/v5.0.14-en.md b/changes/v5.0.14-en.md index 214d4e58e..8e9b14e76 100644 --- a/changes/v5.0.14-en.md +++ b/changes/v5.0.14-en.md @@ -2,6 +2,10 @@ ## Enhancements +- Make possible to configure `host:port` from environment variables without quotes [#9614](https://github.com/emqx/emqx/pull/9614). + Prior to this change, when overriding a `host:port` config value from environment variable, one has to quote it as: + `env EMQX_BRIDGES__MQTT__XYZ__SERVER='"localhost:1883"'`. + Now it's possible to set it without quote as `env EMQX_BRIDGES__MQTT__XYZ__SERVER='localhost:1883'`. ## Bug Fixes diff --git a/changes/v5.0.14-zh.md b/changes/v5.0.14-zh.md index dc77784a9..cbca3ad32 100644 --- a/changes/v5.0.14-zh.md +++ b/changes/v5.0.14-zh.md @@ -2,6 +2,10 @@ ## 增强 +- 允许环境变量重载 `host:port` 值时不使用引号 [#9614](https://github.com/emqx/emqx/pull/9614)。 + 在此修复前,环境变量中使用 `host:port` 这种配置时,用户必须使用引号,例如: + `env EMQX_BRIDGES__MQTT__XYZ__SERVER='"localhost:1883"'`。 + 此修复后,可以不使用引号,例如 `env EMQX_BRIDGES__MQTT__XYZ__SERVER='localhost:1883'`。 ## 修复 diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf index c163db70b..7ccf20d0b 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -81,8 +81,8 @@ emqx_ee_bridge_kafka { } bootstrap_hosts { desc { - en: "A comma separated list of Kafka host:port endpoints to bootstrap the client." - zh: "用逗号分隔的 host:port 主机列表。" + en: "A comma separated list of Kafka host[:port] endpoints to bootstrap the client. Default port number is 9092." + zh: "用逗号分隔的 host[:port] 主机列表。默认端口号为 9092。" } label { en: "Bootstrap Hosts" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 2540b987c..c94d47a4e 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -26,7 +26,8 @@ namespace/0, roots/0, fields/1, - desc/1 + desc/1, + host_opts/0 ]). %% ------------------------------------------------------------------------------------------------- @@ -54,6 +55,9 @@ values(put) -> %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions +host_opts() -> + #{default_port => 9092}. + namespace() -> "bridge_kafka". roots() -> ["config"]. @@ -67,7 +71,17 @@ fields("get") -> fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {bootstrap_hosts, mk(binary(), #{required => true, desc => ?DESC(bootstrap_hosts)})}, + {bootstrap_hosts, + mk( + binary(), + #{ + required => true, + desc => ?DESC(bootstrap_hosts), + validator => emqx_schema:servers_validator( + host_opts(), _Required = true + ) + } + )}, {connect_timeout, mk(emqx_schema:duration_ms(), #{ default => "5s", diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index b46bdb486..353652636 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -177,10 +177,8 @@ on_get_status(_InstId, _State) -> connected. %% Parse comma separated host:port list into a [{Host,Port}] list -hosts(Hosts) when is_binary(Hosts) -> - hosts(binary_to_list(Hosts)); -hosts(Hosts) when is_list(Hosts) -> - kpro:parse_endpoints(Hosts). +hosts(Hosts) -> + emqx_schema:parse_servers(Hosts, emqx_ee_bridge_kafka:host_opts()). %% Extra socket options, such as sndbuf size etc. socket_opts(Opts) when is_map(Opts) -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index f83a96cb2..7926b1b91 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -777,15 +777,13 @@ t_publish_success_batch(Config) -> t_not_a_json(Config) -> ?assertMatch( - {error, - {_, [ - #{ - kind := validation_error, - reason := #{exception := {error, {badmap, "not a json"}}}, - %% should be censored as it contains secrets - value := <<"******">> - } - ]}}, + {error, #{ + discarded_errors_count := 0, + kind := validation_error, + reason := #{exception := {error, {badmap, "not a json"}}}, + %% should be censored as it contains secrets + value := <<"******">> + }}, create_bridge( Config, #{ @@ -797,15 +795,13 @@ t_not_a_json(Config) -> t_not_of_service_account_type(Config) -> ?assertMatch( - {error, - {_, [ - #{ - kind := validation_error, - reason := {wrong_type, <<"not a service account">>}, - %% should be censored as it contains secrets - value := <<"******">> - } - ]}}, + {error, #{ + discarded_errors_count := 0, + kind := validation_error, + reason := {wrong_type, <<"not a service account">>}, + %% should be censored as it contains secrets + value := <<"******">> + }}, create_bridge( Config, #{ @@ -818,22 +814,20 @@ t_not_of_service_account_type(Config) -> t_json_missing_fields(Config) -> GCPPubSubConfig0 = ?config(gcp_pubsub_config, Config), ?assertMatch( - {error, - {_, [ - #{ - kind := validation_error, - reason := - {missing_keys, [ - <<"client_email">>, - <<"private_key">>, - <<"private_key_id">>, - <<"project_id">>, - <<"type">> - ]}, - %% should be censored as it contains secrets - value := <<"******">> - } - ]}}, + {error, #{ + discarded_errors_count := 0, + kind := validation_error, + reason := + {missing_keys, [ + <<"client_email">>, + <<"private_key">>, + <<"private_key_id">>, + <<"project_id">>, + <<"type">> + ]}, + %% should be censored as it contains secrets + value := <<"******">> + }}, create_bridge([ {gcp_pubsub_config, GCPPubSubConfig0#{<<"service_account_json">> := #{}}} | Config diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index cd6a2d212..7efeb553e 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -374,21 +374,15 @@ all_test_hosts() -> lists:flatmap( fun (#{<<"servers">> := ServersRaw}) -> - lists:map( - fun(Server) -> - parse_server(Server) - end, - string:tokens(binary_to_list(ServersRaw), ", ") - ); + parse_servers(ServersRaw); (#{<<"server">> := ServerRaw}) -> - [parse_server(ServerRaw)] + parse_servers(ServerRaw) end, Confs ). -parse_server(Server) -> - emqx_connector_schema_lib:parse_server(Server, #{ - host_type => hostname, +parse_servers(Servers) -> + emqx_schema:parse_servers(Servers, #{ default_port => 6379 }). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl index 29170be41..421d6c5da 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl @@ -83,9 +83,7 @@ on_start( %% emulating the emulator behavior %% https://cloud.google.com/pubsub/docs/emulator HostPort = os:getenv("PUBSUB_EMULATOR_HOST", "pubsub.googleapis.com:443"), - {Host, Port} = emqx_connector_schema_lib:parse_server( - HostPort, #{host_type => hostname, default_port => 443} - ), + {Host, Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}), PoolType = random, Transport = tls, TransportOpts = emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none}), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 3ef9fc352..e6411db31 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -35,7 +35,6 @@ %% influxdb servers don't need parse -define(INFLUXDB_HOST_OPTIONS, #{ - host_type => hostname, default_port => ?INFLUXDB_DEFAULT_PORT }). @@ -141,7 +140,7 @@ namespace() -> connector_influxdb. fields(common) -> [ - {server, fun server/1}, + {server, server()}, {precision, mk(enum([ns, us, ms, s, m, h]), #{ required => false, default => ms, desc => ?DESC("precision") @@ -164,13 +163,14 @@ fields(influxdb_api_v2) -> {token, mk(binary(), #{required => true, desc => ?DESC("token")})} ] ++ emqx_connector_schema_lib:ssl_fields(). -server(type) -> emqx_schema:ip_port(); -server(required) -> true; -server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")]; -server(converter) -> fun to_server_raw/1; -server(default) -> <<"127.0.0.1:8086">>; -server(desc) -> ?DESC("server"); -server(_) -> undefined. +server() -> + Meta = #{ + required => false, + default => <<"127.0.0.1:8086">>, + desc => ?DESC("server"), + converter => fun convert_server/2 + }, + emqx_schema:servers_sc(Meta, ?INFLUXDB_HOST_OPTIONS). desc(common) -> ?DESC("common"); @@ -261,9 +261,10 @@ do_start_client( client_config( InstId, Config = #{ - server := {Host, Port} + server := Server } ) -> + {Host, Port} = emqx_schema:parse_server(Server, ?INFLUXDB_HOST_OPTIONS), [ {host, str(Host)}, {port, Port}, @@ -542,17 +543,12 @@ log_error_points(InstId, Errs) -> Errs ). -%% =================================================================== -%% typereflt funcs - --spec to_server_raw(string() | binary()) -> - {string(), pos_integer()}. -to_server_raw(<<"http://", Server/binary>>) -> - emqx_connector_schema_lib:parse_server(Server, ?INFLUXDB_HOST_OPTIONS); -to_server_raw(<<"https://", Server/binary>>) -> - emqx_connector_schema_lib:parse_server(Server, ?INFLUXDB_HOST_OPTIONS); -to_server_raw(Server) -> - emqx_connector_schema_lib:parse_server(Server, ?INFLUXDB_HOST_OPTIONS). +convert_server(<<"http://", Server/binary>>, HoconOpts) -> + convert_server(Server, HoconOpts); +convert_server(<<"https://", Server/binary>>, HoconOpts) -> + convert_server(Server, HoconOpts); +convert_server(Server, HoconOpts) -> + emqx_schema:convert_servers(Server, HoconOpts). str(A) when is_atom(A) -> atom_to_list(A); @@ -568,22 +564,6 @@ str(S) when is_list(S) -> -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -to_server_raw_test_() -> - [ - ?_assertEqual( - {"foobar", 1234}, - to_server_raw(<<"http://foobar:1234">>) - ), - ?_assertEqual( - {"foobar", 1234}, - to_server_raw(<<"https://foobar:1234">>) - ), - ?_assertEqual( - {"foobar", 1234}, - to_server_raw(<<"foobar:1234">>) - ) - ]. - %% for coverage desc_test_() -> [ @@ -605,7 +585,7 @@ desc_test_() -> ), ?_assertMatch( {desc, _, _}, - server(desc) + hocon_schema:field_schema(server(), desc) ), ?_assertMatch( connector_influxdb, diff --git a/scripts/apps-version-check.sh b/scripts/apps-version-check.sh index 95e71f128..3432c757c 100755 --- a/scripts/apps-version-check.sh +++ b/scripts/apps-version-check.sh @@ -1,7 +1,8 @@ #!/usr/bin/env bash set -euo pipefail -latest_release=$(git describe --abbrev=0 --tags --exclude '*rc*' --exclude '*alpha*' --exclude '*beta*') +latest_release=$(git describe --abbrev=0 --tags --exclude '*rc*' --exclude '*alpha*' --exclude '*beta*' --exclude '*docker*') +echo "Compare base: $latest_release" bad_app_count=0 diff --git a/scripts/git-hook-pre-commit.sh b/scripts/git-hook-pre-commit.sh index a5e441d05..b0e0699b9 100755 --- a/scripts/git-hook-pre-commit.sh +++ b/scripts/git-hook-pre-commit.sh @@ -15,3 +15,4 @@ if ! (./scripts/erlfmt $OPT $files); then echo "EXECUTE 'make fmt' to fix" >&2 exit 1 fi +./scripts/apps-version-check.sh