diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 272a1d0cd..248fdad7f 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -42,7 +42,12 @@ -type ip_port() :: tuple() | integer(). -type cipher() :: map(). -type port_number() :: 1..65536. --type server_parse_option() :: #{default_port => port_number(), no_port => boolean()}. +-type server_parse_option() :: #{ + default_port => port_number(), + no_port => boolean(), + supported_schemes => [string()], + default_scheme => string() +}. -type url() :: binary(). -type json_binary() :: binary(). @@ -2896,7 +2901,10 @@ servers_validator(Opts, Required) -> %% `no_port': by default it's `false', when set to `true', %% a `throw' exception is raised if the port is found. -spec parse_server(undefined | string() | binary(), server_parse_option()) -> - {string(), port_number()}. + string() + | {string(), port_number()} + | {string(), string()} + | {string(), string(), port_number()}. parse_server(Str, Opts) -> case parse_servers(Str, Opts) of undefined -> @@ -2910,7 +2918,12 @@ parse_server(Str, Opts) -> %% @doc Parse comma separated `host[:port][,host[:port]]' endpoints %% into a list of `{Host, Port}' tuples or just `Host' string. -spec parse_servers(undefined | string() | binary(), server_parse_option()) -> - [{string(), port_number()}]. + [ + string() + | {string(), port_number()} + | {string(), string()} + | {string(), string(), port_number()} + ]. parse_servers(undefined, _Opts) -> %% should not parse 'undefined' as string, %% not to throw exception either, @@ -2956,6 +2969,9 @@ split_host_port(Str) -> do_parse_server(Str, Opts) -> DefaultPort = maps:get(default_port, Opts, undefined), NotExpectingPort = maps:get(no_port, Opts, false), + DefaultScheme = maps:get(default_scheme, Opts, undefined), + SupportedSchemes = maps:get(supported_schemes, Opts, []), + NotExpectingScheme = (not is_list(DefaultScheme)) andalso length(SupportedSchemes) =:= 0, case is_integer(DefaultPort) andalso NotExpectingPort of true -> %% either provide a default port from schema, @@ -2964,24 +2980,74 @@ do_parse_server(Str, Opts) -> false -> ok end, + case is_list(DefaultScheme) andalso (not lists:member(DefaultScheme, SupportedSchemes)) of + true -> + %% inconsistent schema + error("bad_schema"); + false -> + ok + end, %% do not split with space, there should be no space allowed between host and port case string:tokens(Str, ":") of - [Hostname, Port] -> + [Scheme, "//" ++ Hostname, Port] -> NotExpectingPort andalso throw("not_expecting_port_number"), - {check_hostname(Hostname), parse_port(Port)}; - [Hostname] -> + NotExpectingScheme andalso throw("not_expecting_scheme"), + {check_scheme(Scheme, Opts), check_hostname(Hostname), parse_port(Port)}; + [Scheme, "//" ++ Hostname] -> + NotExpectingScheme andalso throw("not_expecting_scheme"), case is_integer(DefaultPort) of true -> - {check_hostname(Hostname), DefaultPort}; + {check_scheme(Scheme, Opts), check_hostname(Hostname), DefaultPort}; false when NotExpectingPort -> - check_hostname(Hostname); + {check_scheme(Scheme, Opts), check_hostname(Hostname)}; false -> throw("missing_port_number") end; + [Hostname, Port] -> + NotExpectingPort andalso throw("not_expecting_port_number"), + case is_list(DefaultScheme) of + false -> + {check_hostname(Hostname), parse_port(Port)}; + true -> + {DefaultScheme, check_hostname(Hostname), parse_port(Port)} + end; + [Hostname] -> + case is_integer(DefaultPort) orelse NotExpectingPort of + true -> + ok; + false -> + throw("missing_port_number") + end, + case is_list(DefaultScheme) orelse NotExpectingScheme of + true -> + ok; + false -> + throw("missing_scheme") + end, + case {is_integer(DefaultPort), is_list(DefaultScheme)} of + {true, true} -> + {DefaultScheme, check_hostname(Hostname), DefaultPort}; + {true, false} -> + {check_hostname(Hostname), DefaultPort}; + {false, true} -> + {DefaultScheme, check_hostname(Hostname)}; + {false, false} -> + check_hostname(Hostname) + end; _ -> throw("bad_host_port") end. +check_scheme(Str, Opts) -> + SupportedSchemes = maps:get(supported_schemes, Opts, []), + IsSupported = lists:member(Str, SupportedSchemes), + case IsSupported of + true -> + Str; + false -> + throw("unsupported_scheme") + end. + check_hostname(Str) -> %% not intended to use inet_parse:domain here %% only checking space because it interferes the parsing diff --git a/apps/emqx/test/emqx_schema_tests.erl b/apps/emqx/test/emqx_schema_tests.erl index 5176f4fad..c13dc8055 100644 --- a/apps/emqx/test/emqx_schema_tests.erl +++ b/apps/emqx/test/emqx_schema_tests.erl @@ -350,7 +350,7 @@ parse_server_test_() -> ) ), ?T( - "multiple servers wihtout port, mixed list(binary|string)", + "multiple servers without port, mixed list(binary|string)", ?assertEqual( ["host1", "host2"], Parse2([<<"host1">>, "host2"], #{no_port => true}) @@ -447,6 +447,171 @@ parse_server_test_() -> "bad_schema", emqx_schema:parse_server("whatever", #{default_port => 10, no_port => true}) ) + ), + ?T( + "scheme, hostname and port", + ?assertEqual( + {"pulsar+ssl", "host", 6651}, + emqx_schema:parse_server( + "pulsar+ssl://host:6651", + #{ + default_port => 6650, + supported_schemes => ["pulsar", "pulsar+ssl"] + } + ) + ) + ), + ?T( + "scheme and hostname, default port", + ?assertEqual( + {"pulsar", "host", 6650}, + emqx_schema:parse_server( + "pulsar://host", + #{ + default_port => 6650, + supported_schemes => ["pulsar", "pulsar+ssl"] + } + ) + ) + ), + ?T( + "scheme and hostname, no port", + ?assertEqual( + {"pulsar", "host"}, + emqx_schema:parse_server( + "pulsar://host", + #{ + no_port => true, + supported_schemes => ["pulsar", "pulsar+ssl"] + } + ) + ) + ), + ?T( + "scheme and hostname, missing port", + ?assertThrow( + "missing_port_number", + emqx_schema:parse_server( + "pulsar://host", + #{ + no_port => false, + supported_schemes => ["pulsar", "pulsar+ssl"] + } + ) + ) + ), + ?T( + "hostname, default scheme, no default port", + ?assertEqual( + {"pulsar", "host"}, + emqx_schema:parse_server( + "host", + #{ + default_scheme => "pulsar", + no_port => true, + supported_schemes => ["pulsar", "pulsar+ssl"] + } + ) + ) + ), + ?T( + "hostname, default scheme, default port", + ?assertEqual( + {"pulsar", "host", 6650}, + emqx_schema:parse_server( + "host", + #{ + default_port => 6650, + default_scheme => "pulsar", + supported_schemes => ["pulsar", "pulsar+ssl"] + } + ) + ) + ), + ?T( + "just hostname, expecting missing scheme", + ?assertThrow( + "missing_scheme", + emqx_schema:parse_server( + "host", + #{ + no_port => true, + supported_schemes => ["pulsar", "pulsar+ssl"] + } + ) + ) + ), + ?T( + "hostname, default scheme, defined port", + ?assertEqual( + {"pulsar", "host", 6651}, + emqx_schema:parse_server( + "host:6651", + #{ + default_port => 6650, + default_scheme => "pulsar", + supported_schemes => ["pulsar", "pulsar+ssl"] + } + ) + ) + ), + ?T( + "inconsistent scheme opts", + ?assertError( + "bad_schema", + emqx_schema:parse_server( + "pulsar+ssl://host:6651", + #{ + default_port => 6650, + default_scheme => "something", + supported_schemes => ["not", "supported"] + } + ) + ) + ), + ?T( + "hostname, default scheme, defined port", + ?assertEqual( + {"pulsar", "host", 6651}, + emqx_schema:parse_server( + "host:6651", + #{ + default_port => 6650, + default_scheme => "pulsar", + supported_schemes => ["pulsar", "pulsar+ssl"] + } + ) + ) + ), + ?T( + "unsupported scheme", + ?assertThrow( + "unsupported_scheme", + emqx_schema:parse_server( + "pulsar+quic://host:6651", + #{ + default_port => 6650, + supported_schemes => ["pulsar"] + } + ) + ) + ), + ?T( + "multiple hostnames with schemes (1)", + ?assertEqual( + [ + {"pulsar", "host", 6649}, + {"pulsar+ssl", "other.host", 6651}, + {"pulsar", "yet.another", 6650} + ], + emqx_schema:parse_servers( + "pulsar://host:6649, pulsar+ssl://other.host:6651,pulsar://yet.another", + #{ + default_port => 6650, + supported_schemes => ["pulsar", "pulsar+ssl"] + } + ) + ) ) ].