diff --git a/Makefile b/Makefile
index 2e45b3068..574827b3d 100644
--- a/Makefile
+++ b/Makefile
@@ -129,7 +129,7 @@ $(PROFILES:%=clean-%):
rm rebar.lock \
rm -rf _build/$(@:clean-%=%)/rel; \
$(FIND) _build/$(@:clean-%=%) -name '*.beam' -o -name '*.so' -o -name '*.app' -o -name '*.appup' -o -name '*.o' -o -name '*.d' -type f | xargs rm -f; \
- $(FIND) _build/$(@:clean-%=%) -type l -delete; \
+ $(FIND) _build/$(@:clean-%=%) -type l -delete; \
fi
.PHONY: clean-all
diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl
index 038175854..8929eba20 100644
--- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl
+++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl
@@ -50,8 +50,7 @@ fields(?CONF_NS) ->
, {query, fun query/1}
, {query_timeout, fun query_timeout/1}
] ++ emqx_authn_schema:common_fields()
- ++ emqx_connector_schema_lib:relational_db_fields()
- ++ emqx_connector_schema_lib:ssl_fields().
+ ++ emqx_connector_mysql:fields(config).
query(type) -> string();
query(_) -> undefined.
diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl
index cb0bc4df1..fec0abe54 100644
--- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl
+++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl
@@ -54,9 +54,9 @@ fields(?CONF_NS) ->
, {backend, emqx_authn_schema:backend(postgresql)}
, {password_hash_algorithm, fun emqx_authn_password_hashing:type_ro/1}
, {query, fun query/1}
- ] ++ emqx_authn_schema:common_fields()
- ++ emqx_connector_schema_lib:relational_db_fields()
- ++ emqx_connector_schema_lib:ssl_fields().
+ ] ++
+ emqx_authn_schema:common_fields() ++
+ proplists:delete(named_queries, emqx_connector_pgsql:fields(config)).
query(type) -> string();
query(_) -> undefined.
diff --git a/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl
index 4e69c6fcc..f003456ad 100644
--- a/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl
@@ -19,13 +19,13 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include("emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(MONGO_HOST, "mongo").
--define(MONGO_PORT, 27017).
-define(MONGO_CLIENT, 'emqx_authn_mongo_SUITE_client').
-define(PATH, [authentication]).
@@ -47,7 +47,7 @@ end_per_testcase(_TestCase, _Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
@@ -386,16 +386,13 @@ drop_seeds() ->
ok.
mongo_server() ->
- iolist_to_binary(
- io_lib:format(
- "~s:~b",
- [?MONGO_HOST, ?MONGO_PORT])).
+ iolist_to_binary(io_lib:format("~s",[?MONGO_HOST])).
mongo_config() ->
[
{database, <<"mqtt">>},
{host, ?MONGO_HOST},
- {port, ?MONGO_PORT},
+ {port, ?MONGO_DEFAULT_PORT},
{register, ?MONGO_CLIENT}
].
diff --git a/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl
index 0531ce249..0e36d125c 100644
--- a/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl
@@ -19,6 +19,7 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include("emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@@ -26,7 +27,6 @@
-define(MONGO_HOST, "mongo-tls").
--define(MONGO_PORT, 27017).
-define(PATH, [authentication]).
@@ -43,7 +43,7 @@ init_per_testcase(_TestCase, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
@@ -78,8 +78,8 @@ t_create(_Config) ->
<<"versions">> => [<<"tlsv1.2">>],
<<"ciphers">> => [<<"ECDHE-RSA-AES256-GCM-SHA384">>]}),
fun({ok, _}, Trace) ->
- ?assertEqual(
- [ok],
+ ?assertMatch(
+ [ok | _],
?projection(
status,
?of_kind(emqx_connector_mongo_health_check, Trace)))
@@ -100,7 +100,7 @@ t_create_invalid_server_name(_Config) ->
end).
-%% docker-compose-mongo-single-tls.yaml:
+%% docker-compose-mongo-single-tls.yaml:
%% --tlsDisabledProtocols TLS1_0,TLS1_1
t_create_invalid_version(_Config) ->
@@ -169,15 +169,12 @@ raw_mongo_auth_config(SpecificSSLOpts) ->
topology => #{
server_selection_timeout_ms => <<"10000ms">>
},
-
+
ssl => maps:merge(SSLOpts, SpecificSSLOpts)
}.
mongo_server() ->
- iolist_to_binary(
- io_lib:format(
- "~s:~b",
- [?MONGO_HOST, ?MONGO_PORT])).
+ iolist_to_binary(io_lib:format("~s",[?MONGO_HOST])).
start_apps(Apps) ->
lists:foreach(fun application:ensure_all_started/1, Apps).
diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl
index 489a602c0..5ad7ea795 100644
--- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl
@@ -19,12 +19,12 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include("emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(MYSQL_HOST, "mysql").
--define(MYSQL_PORT, 3306).
-define(MYSQL_RESOURCE, <<"emqx_authn_mysql_SUITE">>).
-define(PATH, [authentication]).
@@ -53,7 +53,7 @@ end_per_group(require_seeds, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
@@ -391,10 +391,7 @@ drop_seeds() ->
ok = q("DROP TABLE IF EXISTS users").
mysql_server() ->
- iolist_to_binary(
- io_lib:format(
- "~s:~b",
- [?MYSQL_HOST, ?MYSQL_PORT])).
+ iolist_to_binary(io_lib:format("~s",[?MYSQL_HOST])).
mysql_config() ->
#{auto_reconnect => true,
@@ -402,7 +399,7 @@ mysql_config() ->
username => <<"root">>,
password => <<"public">>,
pool_size => 8,
- server => {?MYSQL_HOST, ?MYSQL_PORT},
+ server => {?MYSQL_HOST, ?MYSQL_DEFAULT_PORT},
ssl => #{enable => false}
}.
diff --git a/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl
index d20b7b50d..a919c91c9 100644
--- a/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl
@@ -19,12 +19,12 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include("emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(MYSQL_HOST, "mysql-tls").
--define(MYSQL_PORT, 3306).
-define(PATH, [authentication]).
@@ -44,7 +44,7 @@ init_per_testcase(_, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
@@ -132,10 +132,7 @@ raw_mysql_auth_config(SpecificSSLOpts) ->
}.
mysql_server() ->
- iolist_to_binary(
- io_lib:format(
- "~s:~b",
- [?MYSQL_HOST, ?MYSQL_PORT])).
+ iolist_to_binary(io_lib:format("~s",[?MYSQL_HOST])).
start_apps(Apps) ->
lists:foreach(fun application:ensure_all_started/1, Apps).
diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl
index 6a5e07939..227107763 100644
--- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl
@@ -19,13 +19,13 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include("emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_placeholder.hrl").
-define(PGSQL_HOST, "pgsql").
--define(PGSQL_PORT, 5432).
-define(PGSQL_RESOURCE, <<"emqx_authn_pgsql_SUITE">>).
-define(PATH, [authentication]).
@@ -54,7 +54,7 @@ end_per_group(require_seeds, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
@@ -439,10 +439,7 @@ drop_seeds() ->
ok.
pgsql_server() ->
- iolist_to_binary(
- io_lib:format(
- "~s:~b",
- [?PGSQL_HOST, ?PGSQL_PORT])).
+ iolist_to_binary(io_lib:format("~s",[?PGSQL_HOST])).
pgsql_config() ->
#{auto_reconnect => true,
@@ -450,7 +447,7 @@ pgsql_config() ->
username => <<"root">>,
password => <<"public">>,
pool_size => 8,
- server => {?PGSQL_HOST, ?PGSQL_PORT},
+ server => {?PGSQL_HOST, ?PGSQL_DEFAULT_PORT},
ssl => #{enable => false}
}.
diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl
index b2ec46fd5..72c4a3126 100644
--- a/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl
@@ -19,12 +19,12 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include("emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(PGSQL_HOST, "pgsql-tls").
--define(PGSQL_PORT, 5432).
-define(PATH, [authentication]).
@@ -44,7 +44,7 @@ init_per_testcase(_, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
@@ -131,14 +131,10 @@ raw_pgsql_auth_config(SpecificSSLOpts) ->
}.
pgsql_server() ->
- iolist_to_binary(
- io_lib:format(
- "~s:~b",
- [?PGSQL_HOST, ?PGSQL_PORT])).
+ iolist_to_binary(io_lib:format("~s",[?PGSQL_HOST])).
start_apps(Apps) ->
lists:foreach(fun application:ensure_all_started/1, Apps).
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).
-
diff --git a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl
index 64805ecb7..3d0356187 100644
--- a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl
@@ -19,12 +19,12 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include("emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(REDIS_HOST, "redis").
--define(REDIS_PORT, 6379).
-define(REDIS_RESOURCE, <<"emqx_authn_redis_SUITE">>).
-define(PATH, [authentication]).
@@ -53,7 +53,7 @@ end_per_group(require_seeds, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
@@ -380,10 +380,7 @@ drop_seeds() ->
user_seeds()).
redis_server() ->
- iolist_to_binary(
- io_lib:format(
- "~s:~b",
- [?REDIS_HOST, ?REDIS_PORT])).
+ iolist_to_binary(io_lib:format("~s",[?REDIS_HOST])).
redis_config() ->
#{auto_reconnect => true,
@@ -391,7 +388,7 @@ redis_config() ->
pool_size => 8,
redis_type => single,
password => "public",
- server => {?REDIS_HOST, ?REDIS_PORT},
+ server => {?REDIS_HOST, ?REDIS_DEFAULT_PORT},
ssl => #{enable => false}
}.
diff --git a/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl
index 84f67937b..b3f456622 100644
--- a/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl
@@ -19,12 +19,13 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include("emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(REDIS_HOST, "redis-tls").
--define(REDIS_PORT, 6380).
+-define(REDIS_TLS_PORT, 6380).
-define(PATH, [authentication]).
@@ -44,7 +45,7 @@ init_per_testcase(_, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_TLS_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
@@ -127,10 +128,7 @@ raw_redis_auth_config(SpecificSSLOpts) ->
}.
redis_server() ->
- iolist_to_binary(
- io_lib:format(
- "~s:~b",
- [?REDIS_HOST, ?REDIS_PORT])).
+ iolist_to_binary(io_lib:format("~s:~b",[?REDIS_HOST, ?REDIS_TLS_PORT])).
start_apps(Apps) ->
lists:foreach(fun application:ensure_all_started/1, Apps).
diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl
index 191e0bd78..779cc52ae 100644
--- a/apps/emqx_authz/src/emqx_authz_schema.erl
+++ b/apps/emqx_authz/src/emqx_authz_schema.erl
@@ -144,12 +144,10 @@ fields(redis_cluster) ->
[ {cmd, query()} ].
http_common_fields() ->
- [ {type, #{type => http}}
- , {enable, #{type => boolean(), default => true}}
- , {url, fun url/1}
+ [ {url, fun url/1}
, {request_timeout, mk_duration("request timeout", #{default => "30s"})}
, {body, #{type => map(), nullable => true}}
- ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)).
+ ] ++ proplists:delete(base_url, connector_fields(http)).
mongo_common_fields() ->
[ {collection, #{type => atom()}}
diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl
index 03df47eac..972dea0d3 100644
--- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl
@@ -26,7 +26,10 @@
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").
--define(MONGO_SINGLE_HOST, "mongo:27017").
+-define(MONGO_SINGLE_HOST, "mongo").
+-define(MYSQL_HOST, "mysql:3306").
+-define(PGSQL_HOST, "pgsql").
+-define(REDIS_SINGLE_HOST, "redis").
-define(SOURCE1, #{<<"type">> => <<"http">>,
<<"enable">> => true,
@@ -48,7 +51,7 @@
}).
-define(SOURCE3, #{<<"type">> => <<"mysql">>,
<<"enable">> => true,
- <<"server">> => <<"mysql:3306">>,
+ <<"server">> => <>,
<<"pool_size">> => 1,
<<"database">> => <<"mqtt">>,
<<"username">> => <<"xx">>,
@@ -59,7 +62,7 @@
}).
-define(SOURCE4, #{<<"type">> => <<"postgresql">>,
<<"enable">> => true,
- <<"server">> => <<"pgsql:5432">>,
+ <<"server">> => <>,
<<"pool_size">> => 1,
<<"database">> => <<"mqtt">>,
<<"username">> => <<"xx">>,
@@ -70,7 +73,7 @@
}).
-define(SOURCE5, #{<<"type">> => <<"redis">>,
<<"enable">> => true,
- <<"servers">> => <<"redis:6379,127.0.0.1:6380">>,
+ <<"servers">> => <>,
<<"pool_size">> => 1,
<<"database">> => 0,
<<"password">> => <<"ee">>,
diff --git a/apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl
index 4b990f125..3d9bdf38d 100644
--- a/apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl
@@ -18,13 +18,13 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include("emqx_authz.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_placeholder.hrl").
-define(MONGO_HOST, "mongo").
--define(MONGO_PORT, 27017).
-define(MONGO_CLIENT, 'emqx_authz_mongo_SUITE_client').
all() ->
@@ -34,7 +34,7 @@ groups() ->
[].
init_per_suite(Config) ->
- case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
@@ -237,16 +237,13 @@ raw_mongo_authz_config() ->
}.
mongo_server() ->
- iolist_to_binary(
- io_lib:format(
- "~s:~b",
- [?MONGO_HOST, ?MONGO_PORT])).
+ iolist_to_binary(io_lib:format("~s",[?MONGO_HOST])).
mongo_config() ->
[
{database, <<"mqtt">>},
{host, ?MONGO_HOST},
- {port, ?MONGO_PORT},
+ {port, ?MONGO_DEFAULT_PORT},
{register, ?MONGO_CLIENT}
].
diff --git a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl
index 0cccd748e..c684cd5ae 100644
--- a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl
@@ -18,13 +18,12 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include("emqx_authz.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-
-define(MYSQL_HOST, "mysql").
--define(MYSQL_PORT, 3306).
-define(MYSQL_RESOURCE, <<"emqx_authz_mysql_SUITE">>).
all() ->
@@ -34,7 +33,7 @@ groups() ->
[].
init_per_suite(Config) ->
- case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
@@ -274,10 +273,7 @@ setup_config(SpecialParams) ->
SpecialParams).
mysql_server() ->
- iolist_to_binary(
- io_lib:format(
- "~s:~b",
- [?MYSQL_HOST, ?MYSQL_PORT])).
+ iolist_to_binary(io_lib:format("~s",[?MYSQL_HOST])).
mysql_config() ->
#{auto_reconnect => true,
@@ -285,7 +281,7 @@ mysql_config() ->
username => <<"root">>,
password => <<"public">>,
pool_size => 8,
- server => {?MYSQL_HOST, ?MYSQL_PORT},
+ server => {?MYSQL_HOST, ?MYSQL_DEFAULT_PORT},
ssl => #{enable => false}
}.
diff --git a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl
index fbc2cc922..41fe61ef5 100644
--- a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl
@@ -18,13 +18,12 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include("emqx_authz.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-
-define(PGSQL_HOST, "pgsql").
--define(PGSQL_PORT, 5432).
-define(PGSQL_RESOURCE, <<"emqx_authz_pgsql_SUITE">>).
all() ->
@@ -34,7 +33,7 @@ groups() ->
[].
init_per_suite(Config) ->
- case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
@@ -278,10 +277,7 @@ setup_config(SpecialParams) ->
SpecialParams).
pgsql_server() ->
- iolist_to_binary(
- io_lib:format(
- "~s:~b",
- [?PGSQL_HOST, ?PGSQL_PORT])).
+ iolist_to_binary(io_lib:format("~s",[?PGSQL_HOST])).
pgsql_config() ->
#{auto_reconnect => true,
@@ -289,7 +285,7 @@ pgsql_config() ->
username => <<"root">>,
password => <<"public">>,
pool_size => 8,
- server => {?PGSQL_HOST, ?PGSQL_PORT},
+ server => {?PGSQL_HOST, ?PGSQL_DEFAULT_PORT},
ssl => #{enable => false}
}.
diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl
index 83699f51c..a8d5bed34 100644
--- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl
@@ -19,13 +19,12 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include("emqx_authz.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-
-define(REDIS_HOST, "redis").
--define(REDIS_PORT, 6379).
-define(REDIS_RESOURCE, <<"emqx_authz_redis_SUITE">>).
all() ->
@@ -35,7 +34,7 @@ groups() ->
[].
init_per_suite(Config) ->
- case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
@@ -219,10 +218,7 @@ raw_redis_authz_config() ->
}.
redis_server() ->
- iolist_to_binary(
- io_lib:format(
- "~s:~b",
- [?REDIS_HOST, ?REDIS_PORT])).
+ iolist_to_binary(io_lib:format("~s",[?REDIS_HOST])).
q(Command) ->
emqx_resource:query(
@@ -235,7 +231,7 @@ redis_config() ->
pool_size => 8,
redis_type => single,
password => "public",
- server => {?REDIS_HOST, ?REDIS_PORT},
+ server => {?REDIS_HOST, ?REDIS_DEFAULT_PORT},
ssl => #{enable => false}
}.
diff --git a/apps/emqx_connector/include/emqx_connector.hrl b/apps/emqx_connector/include/emqx_connector.hrl
index fb299b19b..ed9f4af65 100644
--- a/apps/emqx_connector/include/emqx_connector.hrl
+++ b/apps/emqx_connector/include/emqx_connector.hrl
@@ -1,4 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
-define(VALID, emqx_resource_validator).
-define(NOT_EMPTY(MSG), ?VALID:not_empty(MSG)).
-define(MAX(MAXV), ?VALID:max(number, MAXV)).
-define(MIN(MINV), ?VALID:min(number, MINV)).
+
+-define(MYSQL_DEFAULT_PORT, 3306).
+-define(MONGO_DEFAULT_PORT, 27017).
+-define(REDIS_DEFAULT_PORT, 6379).
+-define(PGSQL_DEFAULT_PORT, 5432).
+
+-define(SERVERS_DESC, "A Node list for Cluster to connect to. The nodes should be splited with ',', such as: 'Node[,Node]'
\nFor each Node should be:
").
+
+-define(SERVER_DESC(TYPE, DEFAULT_PORT), """
+The IPv4 or IPv6 address or host name to connect to.
+A host entry has the following form: 'Host[:Port]'
+The """ ++ TYPE ++ " default port " ++ DEFAULT_PORT ++ " is used if '[:Port]' isn't present"
+).
+
+-define(THROW_ERROR(Str), erlang:throw({error, Str})).
diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl
index f3cb05d4f..2416dab7b 100644
--- a/apps/emqx_connector/src/emqx_connector_ldap.erl
+++ b/apps/emqx_connector/src/emqx_connector_ldap.erl
@@ -135,7 +135,7 @@ connect(Opts) ->
{ok, LDAP}.
ldap_fields() ->
- [ {servers, fun emqx_connector_schema_lib:servers/1}
+ [ {servers, fun servers/1}
, {port, fun port/1}
, {pool_size, fun emqx_connector_schema_lib:pool_size/1}
, {bind_dn, fun bind_dn/1}
@@ -144,6 +144,11 @@ ldap_fields() ->
, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
].
+servers(type) -> list();
+servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
+servers(converter) -> fun to_servers_raw/1;
+servers(_) -> undefined.
+
bind_dn(type) -> binary();
bind_dn(default) -> 0;
bind_dn(_) -> undefined.
@@ -154,3 +159,20 @@ port(_) -> undefined.
duration(type) -> emqx_schema:duration_ms();
duration(_) -> undefined.
+
+to_servers_raw(Servers) ->
+ {ok, lists:map( fun(Server) ->
+ case string:tokens(Server, ": ") of
+ [Ip] ->
+ [{host, Ip}];
+ [Ip, Port] ->
+ [{host, Ip}, {port, list_to_integer(Port)}]
+ end
+ end, string:tokens(str(Servers), ", "))}.
+
+str(A) when is_atom(A) ->
+ atom_to_list(A);
+str(B) when is_binary(B) ->
+ binary_to_list(B);
+str(S) when is_list(S) ->
+ S.
diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl
index 6e8d2415e..05f30c2ee 100644
--- a/apps/emqx_connector/src/emqx_connector_mongo.erl
+++ b/apps/emqx_connector/src/emqx_connector_mongo.erl
@@ -20,10 +20,6 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
--type server() :: emqx_schema:ip_port().
--reflect_type([server/0]).
--typerefl_from_string({server/0, emqx_connector_schema_lib, to_ip_port}).
-
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
@@ -42,13 +38,18 @@
-define(HEALTH_CHECK_TIMEOUT, 10000).
+%% mongo servers don't need parse
+-define( MONGO_HOST_OPTIONS
+ , #{ host_type => hostname
+ , default_port => ?MONGO_DEFAULT_PORT}).
+
%%=====================================================================
roots() ->
[ {config, #{type => hoconsc:union(
- [ hoconsc:ref(?MODULE, single)
- , hoconsc:ref(?MODULE, rs)
- , hoconsc:ref(?MODULE, sharded)
- ])}}
+ [ hoconsc:ref(?MODULE, single)
+ , hoconsc:ref(?MODULE, rs)
+ , hoconsc:ref(?MODULE, sharded)
+ ])}}
].
fields(single) ->
@@ -284,12 +285,21 @@ init_worker_options([_ | R], Acc) ->
init_worker_options(R, Acc);
init_worker_options([], Acc) -> Acc.
-server(type) -> binary();
+%% ===================================================================
+%% Schema funcs
+
+server(type) -> emqx_schema:ip_port();
+server(nullable) -> false;
server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
+server(converter) -> fun to_server_raw/1;
+server(desc) -> ?SERVER_DESC("MongoDB", integer_to_list(?MONGO_DEFAULT_PORT));
server(_) -> undefined.
servers(type) -> binary();
+servers(nullable) -> false;
servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
+servers(converter) -> fun to_servers_raw/1;
+servers(desc) -> ?SERVERS_DESC ++ server(desc);
servers(_) -> undefined.
w_mode(type) -> hoconsc:enum([unsafe, safe]);
@@ -312,19 +322,12 @@ srv_record(type) -> boolean();
srv_record(default) -> false;
srv_record(_) -> undefined.
-parse_servers(Type, Servers) when is_binary(Servers) ->
- parse_servers(Type, binary_to_list(Servers));
-parse_servers(Type, Servers) when is_list(Servers) ->
- case string:split(Servers, ",", all) of
- [Host | _] when Type =:= single ->
- [Host];
- Hosts ->
- Hosts
- end.
+%% ===================================================================
+%% Internal funcs
may_parse_srv_and_txt_records(#{server := Server} = Config) ->
NConfig = maps:remove(server, Config),
- may_parse_srv_and_txt_records_(NConfig#{servers => Server});
+ may_parse_srv_and_txt_records_(NConfig#{servers => [Server]});
may_parse_srv_and_txt_records(Config) ->
may_parse_srv_and_txt_records_(Config).
@@ -335,47 +338,52 @@ may_parse_srv_and_txt_records_(#{mongo_type := Type,
true ->
error({missing_parameter, replica_set_name});
false ->
- Config#{hosts => parse_servers(Type, Servers)}
+ Config#{hosts => servers_to_bin(Servers)}
end;
may_parse_srv_and_txt_records_(#{mongo_type := Type,
srv_record := true,
servers := Servers} = Config) ->
- NServers = binary_to_list(Servers),
- Hosts = parse_srv_records(Type, NServers),
- ExtraOpts = parse_txt_records(Type, NServers),
+ Hosts = parse_srv_records(Type, Servers),
+ ExtraOpts = parse_txt_records(Type, Servers),
maps:merge(Config#{hosts => Hosts}, ExtraOpts).
-parse_srv_records(Type, Server) ->
- case inet_res:lookup("_mongodb._tcp." ++ Server, in, srv) of
- [] ->
- error(service_not_found);
- Services ->
- case [Host ++ ":" ++ integer_to_list(Port) || {_, _, Port, Host} <- Services] of
- [H | _] when Type =:= single ->
- [H];
- Hosts ->
- Hosts
- end
+parse_srv_records(Type, Servers) ->
+ Fun = fun(AccIn, {IpOrHost, _Port}) ->
+ case inet_res:lookup("_mongodb._tcp." ++ ip_or_host_to_string(IpOrHost), in, srv) of
+ [] ->
+ error(service_not_found);
+ Services ->
+ [ [server_to_bin({Host, Port}) || {_, _, Port, Host} <- Services]
+ | AccIn]
+ end
+ end,
+ Res = lists:foldl(Fun, [], Servers),
+ case Type of
+ single -> lists:nth(1, Res);
+ _ -> Res
end.
-parse_txt_records(Type, Server) ->
- case inet_res:lookup(Server, in, txt) of
- [] ->
- #{};
- [[QueryString]] ->
- case uri_string:dissect_query(QueryString) of
- {error, _, _} ->
- error({invalid_txt_record, invalid_query_string});
- Options ->
- Fields = case Type of
- rs -> ["authSource", "replicaSet"];
- _ -> ["authSource"]
- end,
- take_and_convert(Fields, Options)
- end;
- _ ->
- error({invalid_txt_record, multiple_records})
- end.
+parse_txt_records(Type, Servers) ->
+ Fun = fun(AccIn, {IpOrHost, _Port}) ->
+ case inet_res:lookup(IpOrHost, in, txt) of
+ [] ->
+ #{};
+ [[QueryString]] ->
+ case uri_string:dissect_query(QueryString) of
+ {error, _, _} ->
+ error({invalid_txt_record, invalid_query_string});
+ Options ->
+ Fields = case Type of
+ rs -> ["authSource", "replicaSet"];
+ _ -> ["authSource"]
+ end,
+ maps:merge(AccIn, take_and_convert(Fields, Options))
+ end;
+ _ ->
+ error({invalid_txt_record, multiple_records})
+ end
+ end,
+ lists:foldl(Fun, #{}, Servers).
take_and_convert(Fields, Options) ->
take_and_convert(Fields, Options, #{}).
@@ -395,3 +403,41 @@ take_and_convert([Field | More], Options, Acc) ->
false ->
take_and_convert(More, Options, Acc)
end.
+
+-spec ip_or_host_to_string(binary() | string() | tuple())
+ -> string().
+ip_or_host_to_string(Ip) when is_tuple(Ip) ->
+ inet:ntoa(Ip);
+ip_or_host_to_string(Host) ->
+ str(Host).
+
+servers_to_bin([Server | Rest]) ->
+ [server_to_bin(Server) | servers_to_bin(Rest)];
+servers_to_bin([]) ->
+ [].
+
+server_to_bin({IpOrHost, Port}) ->
+ iolist_to_binary(ip_or_host_to_string(IpOrHost) ++ ":" ++ integer_to_list(Port)).
+
+%% ===================================================================
+%% typereflt funcs
+
+-spec to_server_raw(string())
+ -> {string(), pos_integer()}.
+to_server_raw(Server) ->
+ emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS).
+
+-spec to_servers_raw(string())
+ -> [{string(), pos_integer()}].
+to_servers_raw(Servers) ->
+ lists:map( fun(Server) ->
+ emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS)
+ end
+ , string:tokens(str(Servers), ", ")).
+
+str(A) when is_atom(A) ->
+ atom_to_list(A);
+str(B) when is_binary(B) ->
+ binary_to_list(B);
+str(S) when is_list(S) ->
+ S.
diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl
index 1af0cc34a..699027b4b 100644
--- a/apps/emqx_connector/src/emqx_connector_mysql.erl
+++ b/apps/emqx_connector/src/emqx_connector_mysql.erl
@@ -15,6 +15,7 @@
%%--------------------------------------------------------------------
-module(emqx_connector_mysql).
+-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
@@ -33,15 +34,28 @@
-export([do_health_check/1]).
+-define( MYSQL_HOST_OPTIONS
+ , #{ host_type => inet_addr
+ , default_port => ?MYSQL_DEFAULT_PORT}).
+
%%=====================================================================
%% Hocon schema
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields(config) ->
+ [ {server, fun server/1}
+ ] ++
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().
+server(type) -> emqx_schema:ip_port();
+server(nullable) -> false;
+server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
+server(converter) -> fun to_server/1;
+server(desc) -> ?SERVER_DESC("MySQL", integer_to_list(?MYSQL_DEFAULT_PORT));
+server(_) -> undefined.
+
%% ===================================================================
on_start(InstId, #{server := {Host, Port},
database := DB,
@@ -106,3 +120,8 @@ reconn_interval(false) -> false.
connect(Options) ->
mysql:start_link(Options).
+
+-spec to_server(string())
+ -> {inet:ip_address() | inet:hostname(), pos_integer()}.
+to_server(Str) ->
+ emqx_connector_schema_lib:parse_server(Str, ?MYSQL_HOST_OPTIONS).
diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl
index 9468257ca..9e2c72e0b 100644
--- a/apps/emqx_connector/src/emqx_connector_pgsql.erl
+++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl
@@ -15,6 +15,7 @@
%%--------------------------------------------------------------------
-module(emqx_connector_pgsql).
+-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("epgsql/include/epgsql.hrl").
@@ -38,13 +39,19 @@
-export([do_health_check/1]).
+-define( PGSQL_HOST_OPTIONS
+ , #{ host_type => inet_addr
+ , default_port => ?PGSQL_DEFAULT_PORT}).
+
+
%%=====================================================================
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields(config) ->
- [{named_queries, fun named_queries/1}] ++
+ [ {named_queries, fun named_queries/1}
+ , {server, fun server/1}] ++
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().
@@ -52,6 +59,13 @@ named_queries(type) -> map();
named_queries(nullable) -> true;
named_queries(_) -> undefined.
+server(type) -> emqx_schema:ip_port();
+server(nullable) -> false;
+server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
+server(converter) -> fun to_server/1;
+server(desc) -> ?SERVER_DESC("PostgreSQL", integer_to_list(?PGSQL_DEFAULT_PORT));
+server(_) -> undefined.
+
%% ===================================================================
on_start(InstId, #{server := {Host, Port},
database := DB,
@@ -163,3 +177,11 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
conn_opts(Opts, [Opt | Acc]);
conn_opts([_Opt | Opts], Acc) ->
conn_opts(Opts, Acc).
+
+%% ===================================================================
+%% typereflt funcs
+
+-spec to_server(string())
+ -> {inet:ip_address() | inet:hostname(), pos_integer()}.
+to_server(Str) ->
+ emqx_connector_schema_lib:parse_server(Str, ?PGSQL_HOST_OPTIONS).
diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl
index fbec547f7..f25ef4331 100644
--- a/apps/emqx_connector/src/emqx_connector_redis.erl
+++ b/apps/emqx_connector/src/emqx_connector_redis.erl
@@ -19,21 +19,6 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
--type server() :: tuple().
-%% {"127.0.0.1", 7000}
-%% For eredis:start_link/1~7
--reflect_type([server/0]).
--typerefl_from_string({server/0, ?MODULE, to_server}).
-
--type servers() :: list().
-%% [{"127.0.0.1", 7000}, {"127.0.0.2", 7000}]
-%% For eredis_cluster
--reflect_type([servers/0]).
--typerefl_from_string({servers/0, ?MODULE, to_servers}).
-
--export([ to_server/1
- , to_servers/1]).
-
-export([roots/0, fields/1]).
-behaviour(emqx_resource).
@@ -51,6 +36,12 @@
-export([cmd/3]).
+%% redis host don't need parse
+-define( REDIS_HOST_OPTIONS
+ , #{ host_type => hostname
+ , default_port => ?REDIS_DEFAULT_PORT}).
+
+
%%=====================================================================
roots() ->
[ {config, #{type => hoconsc:union(
@@ -62,21 +53,21 @@ roots() ->
].
fields(single) ->
- [ {server, #{type => server()}}
+ [ {server, fun server/1}
, {redis_type, #{type => hoconsc:enum([single]),
default => single}}
] ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(cluster) ->
- [ {servers, #{type => servers()}}
+ [ {servers, fun servers/1}
, {redis_type, #{type => hoconsc:enum([cluster]),
default => cluster}}
] ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(sentinel) ->
- [ {servers, #{type => servers()}}
+ [ {servers, fun servers/1}
, {redis_type, #{type => hoconsc:enum([sentinel]),
default => sentinel}}
, {sentinel, #{type => string()}}
@@ -84,6 +75,20 @@ fields(sentinel) ->
redis_fields() ++
emqx_connector_schema_lib:ssl_fields().
+server(type) -> emqx_schema:ip_port();
+server(nullable) -> false;
+server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
+server(converter) -> fun to_server_raw/1;
+server(desc) -> ?SERVER_DESC("Redis", integer_to_list(?REDIS_DEFAULT_PORT));
+server(_) -> undefined.
+
+servers(type) -> list();
+servers(nullable) -> false;
+servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
+servers(converter) -> fun to_servers_raw/1;
+servers(desc) -> ?SERVERS_DESC ++ server(desc);
+servers(_) -> undefined.
+
%% ===================================================================
on_start(InstId, #{redis_type := Type,
database := Database,
@@ -185,24 +190,22 @@ redis_fields() ->
, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
].
-to_server(Server) ->
- try {ok, parse_server(Server)}
- catch
- throw : Error ->
- Error
- end.
+-spec to_server_raw(string())
+ -> {string(), pos_integer()}.
+to_server_raw(Server) ->
+ emqx_connector_schema_lib:parse_server(Server, ?REDIS_HOST_OPTIONS).
-to_servers(Servers) ->
- try {ok, lists:map(fun parse_server/1, string:tokens(Servers, ", "))}
- catch
- throw : _Reason ->
- {error, Servers}
- end.
+-spec to_servers_raw(string())
+ -> [{string(), pos_integer()}].
+to_servers_raw(Servers) ->
+ lists:map( fun(Server) ->
+ emqx_connector_schema_lib:parse_server(Server, ?REDIS_HOST_OPTIONS)
+ end
+ , string:tokens(str(Servers), ", ")).
-parse_server(Server) ->
- case string:tokens(Server, ": ") of
- [Host, Port] ->
- {Host, list_to_integer(Port)};
- _ ->
- throw({error, Server})
- end.
+str(A) when is_atom(A) ->
+ atom_to_list(A);
+str(B) when is_binary(B) ->
+ binary_to_list(B);
+str(S) when is_list(S) ->
+ S.
diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl
index 33d10802b..7f1be401b 100644
--- a/apps/emqx_connector/src/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/emqx_connector_schema.erl
@@ -1,3 +1,18 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
-module(emqx_connector_schema).
-behaviour(hocon_schema).
diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl
index 3a3a650ed..a43cab0d4 100644
--- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl
+++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl
@@ -22,34 +22,27 @@
, ssl_fields/0
]).
--export([ to_ip_port/1
- , ip_port_to_string/1
- , to_servers/1
+-export([ ip_port_to_string/1
+ , parse_server/2
]).
-export([ pool_size/1
, database/1
, username/1
, password/1
- , servers/1
, auto_reconnect/1
]).
--typerefl_from_string({ip_port/0, emqx_connector_schema_lib, to_ip_port}).
--typerefl_from_string({servers/0, emqx_connector_schema_lib, to_servers}).
-
-type database() :: binary().
-type pool_size() :: integer().
-type username() :: binary().
-type password() :: binary().
--type servers() :: list().
-reflect_type([ database/0
, pool_size/0
, username/0
, password/0
- , servers/0
- ]).
+ ]).
-export([roots/0, fields/1]).
@@ -65,19 +58,13 @@ ssl_fields() ->
].
relational_db_fields() ->
- [ {server, fun server/1}
- , {database, fun database/1}
+ [ {database, fun database/1}
, {pool_size, fun pool_size/1}
, {username, fun username/1}
, {password, fun password/1}
, {auto_reconnect, fun auto_reconnect/1}
].
-server(type) -> emqx_schema:ip_port();
-server(nullable) -> false;
-server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
-server(_) -> undefined.
-
database(type) -> binary();
database(nullable) -> false;
database(validator) -> [?NOT_EMPTY("the value of the field 'database' cannot be empty")];
@@ -100,31 +87,59 @@ auto_reconnect(type) -> boolean();
auto_reconnect(default) -> true;
auto_reconnect(_) -> undefined.
-servers(type) -> servers();
-servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
-servers(_) -> undefined.
-
-to_ip_port(Str) ->
- case string:tokens(Str, ": ") of
- [Ip, Port] ->
- case inet:parse_address(Ip) of
- {ok, R} -> {ok, {R, list_to_integer(Port)}};
- _ -> {error, Str}
- end;
- _ -> {error, Str}
- end.
-
ip_port_to_string({Ip, Port}) when is_list(Ip) ->
iolist_to_binary([Ip, ":", integer_to_list(Port)]);
ip_port_to_string({Ip, Port}) when is_tuple(Ip) ->
iolist_to_binary([inet:ntoa(Ip), ":", integer_to_list(Port)]).
-to_servers(Str) ->
- {ok, lists:map(fun(Server) ->
- case string:tokens(Server, ": ") of
- [Ip] ->
- [{host, Ip}];
- [Ip, Port] ->
- [{host, Ip}, {port, list_to_integer(Port)}]
- end
- end, string:tokens(Str, " , "))}.
+parse_server(Str, #{host_type := inet_addr, default_port := DefaultPort}) ->
+ try string:tokens(str(Str), ": ") of
+ [Ip, Port] ->
+ case parse_ip(Ip) of
+ {ok, R} -> {R, list_to_integer(Port)}
+ end;
+ [Ip] ->
+ case parse_ip(Ip) of
+ {ok, R} -> {R, DefaultPort}
+ end;
+ _ ->
+ ?THROW_ERROR("Bad server schema.")
+ catch
+ error : Reason ->
+ ?THROW_ERROR(Reason)
+ end;
+parse_server(Str, #{host_type := hostname, default_port := DefaultPort}) ->
+ try string:tokens(str(Str), ": ") of
+ [Ip, Port] ->
+ {Ip, list_to_integer(Port)};
+ [Ip] ->
+ {Ip, DefaultPort};
+ _ ->
+ ?THROW_ERROR("Bad server schema.")
+ catch
+ error : Reason ->
+ ?THROW_ERROR(Reason)
+ end;
+parse_server(_, _) ->
+ ?THROW_ERROR("Invalid Host").
+
+parse_ip(Str) ->
+ case inet:parse_address(Str) of
+ {ok, R} ->
+ {ok, R};
+ _ ->
+ %% check is a rfc1035's hostname
+ case inet_parse:domain(Str) of
+ true ->
+ {ok, Str};
+ _ ->
+ ?THROW_ERROR("Bad IP or Host")
+ end
+ end.
+
+str(A) when is_atom(A) ->
+ atom_to_list(A);
+str(B) when is_binary(B) ->
+ binary_to_list(B);
+str(S) when is_list(S) ->
+ S.
diff --git a/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl
index 286fbfcf3..6e4ca1c8f 100644
--- a/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl
+++ b/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl
@@ -18,12 +18,12 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(MONGO_HOST, "mongo").
--define(MONGO_PORT, 27017).
-define(MONGO_CLIENT, 'emqx_connector_mongo_SUITE_client').
all() ->
@@ -33,7 +33,7 @@ groups() ->
[].
init_per_suite(Config) ->
- case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_DEFAULT_PORT) of
true ->
ok = emqx_connector_test_helpers:start_apps([ecpool, mongodb]),
Config;
@@ -129,5 +129,5 @@ mongo_config() ->
pool_size => 8,
ssl => #{enable => false},
srv_record => false,
- servers => <<"127.0.0.1:27017">>
+ server => {<>, ?MONGO_DEFAULT_PORT}
}.
diff --git a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl
index 7ffca4c39..bfccc061f 100644
--- a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl
+++ b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl
@@ -18,12 +18,12 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(MYSQL_HOST, "mysql").
--define(MYSQL_PORT, 3306).
all() ->
emqx_common_test_helpers:all(?MODULE).
@@ -32,15 +32,16 @@ groups() ->
[].
init_per_suite(Config) ->
- case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
true ->
+ ok = emqx_connector_test_helpers:start_apps([ecpool, mysql]),
Config;
false ->
{skip, no_mysql}
end.
end_per_suite(_Config) ->
- ok.
+ ok = emqx_connector_test_helpers:stop_apps([ecpool, mysql]).
init_per_testcase(_, Config) ->
?assertEqual(
@@ -124,7 +125,7 @@ mysql_config() ->
username => <<"root">>,
password => <<"public">>,
pool_size => 8,
- server => {?MYSQL_HOST, ?MYSQL_PORT},
+ server => {?MYSQL_HOST, ?MYSQL_DEFAULT_PORT},
ssl => #{enable => false}
}.
diff --git a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl
index 8f04bd71a..dacb5dab0 100644
--- a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl
+++ b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl
@@ -18,12 +18,12 @@
-compile(nowarn_export_all).
-compile(export_all).
+-include("emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(PGSQL_HOST, "pgsql").
--define(PGSQL_PORT, 5432).
all() ->
emqx_common_test_helpers:all(?MODULE).
@@ -32,15 +32,16 @@ groups() ->
[].
init_per_suite(Config) ->
- case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of
true ->
+ ok = emqx_connector_test_helpers:start_apps([ecpool, pgsql]),
Config;
false ->
{skip, no_pgsql}
end.
end_per_suite(_Config) ->
- ok.
+ ok = emqx_connector_test_helpers:stop_apps([ecpool, pgsql]).
init_per_testcase(_, Config) ->
?assertEqual(
@@ -124,7 +125,7 @@ pgsql_config() ->
username => <<"root">>,
password => <<"public">>,
pool_size => 8,
- server => {?PGSQL_HOST, ?PGSQL_PORT},
+ server => {?PGSQL_HOST, ?PGSQL_DEFAULT_PORT},
ssl => #{enable => false}
}.
@@ -135,7 +136,7 @@ pgsql_bad_config() ->
username => <<"bad_root">>,
password => <<"bad_public">>,
pool_size => 8,
- server => {?PGSQL_HOST, ?PGSQL_PORT},
+ server => {?PGSQL_HOST, ?PGSQL_DEFAULT_PORT},
ssl => #{enable => false}
}.
diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
index 2649093bf..dcb3a81e2 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
@@ -445,8 +445,6 @@ typename_to_spec("file()", _Mod) -> #{type => string, example => <<"/path/to/fil
typename_to_spec("ip_port()", _Mod) -> #{type => string, example => <<"127.0.0.1:80">>};
typename_to_spec("ip_ports()", _Mod) -> #{type => string, example => <<"127.0.0.1:80, 127.0.0.2:80">>};
typename_to_spec("url()", _Mod) -> #{type => string, example => <<"http://127.0.0.1">>};
-typename_to_spec("server()", Mod) -> typename_to_spec("ip_port()", Mod);
-typename_to_spec("servers()", Mod) -> typename_to_spec("ip_ports()", Mod);
typename_to_spec("connect_timeout()", Mod) -> typename_to_spec("timeout()", Mod);
typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, example => infinity},
#{type => integer, example => 100}], example => infinity};
diff --git a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl
index b4785cf1a..be27b954e 100644
--- a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl
+++ b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl
@@ -361,7 +361,7 @@ schema("/ref/complicated_type") ->
200 => [
{no_neg_integer, hoconsc:mk(non_neg_integer(), #{})},
{url, hoconsc:mk(emqx_connector_http:url(), #{})},
- {server, hoconsc:mk(emqx_connector_redis:server(), #{})},
+ {server, hoconsc:mk(emqx_schema:ip_port(), #{})},
{connect_timeout, hoconsc:mk(emqx_connector_http:connect_timeout(), #{})},
{pool_type, hoconsc:mk(emqx_connector_http:pool_type(), #{})},
{timeout, hoconsc:mk(timeout(), #{})},