Merge pull request #6888 from JimMoen/connector-default-port
Connector for DB provide default port
This commit is contained in:
commit
43927abb69
2
Makefile
2
Makefile
|
@ -129,7 +129,7 @@ $(PROFILES:%=clean-%):
|
||||||
rm rebar.lock \
|
rm rebar.lock \
|
||||||
rm -rf _build/$(@:clean-%=%)/rel; \
|
rm -rf _build/$(@:clean-%=%)/rel; \
|
||||||
$(FIND) _build/$(@:clean-%=%) -name '*.beam' -o -name '*.so' -o -name '*.app' -o -name '*.appup' -o -name '*.o' -o -name '*.d' -type f | xargs rm -f; \
|
$(FIND) _build/$(@:clean-%=%) -name '*.beam' -o -name '*.so' -o -name '*.app' -o -name '*.appup' -o -name '*.o' -o -name '*.d' -type f | xargs rm -f; \
|
||||||
$(FIND) _build/$(@:clean-%=%) -type l -delete; \
|
$(FIND) _build/$(@:clean-%=%) -type l -delete; \
|
||||||
fi
|
fi
|
||||||
|
|
||||||
.PHONY: clean-all
|
.PHONY: clean-all
|
||||||
|
|
|
@ -50,8 +50,7 @@ fields(?CONF_NS) ->
|
||||||
, {query, fun query/1}
|
, {query, fun query/1}
|
||||||
, {query_timeout, fun query_timeout/1}
|
, {query_timeout, fun query_timeout/1}
|
||||||
] ++ emqx_authn_schema:common_fields()
|
] ++ emqx_authn_schema:common_fields()
|
||||||
++ emqx_connector_schema_lib:relational_db_fields()
|
++ emqx_connector_mysql:fields(config).
|
||||||
++ emqx_connector_schema_lib:ssl_fields().
|
|
||||||
|
|
||||||
query(type) -> string();
|
query(type) -> string();
|
||||||
query(_) -> undefined.
|
query(_) -> undefined.
|
||||||
|
|
|
@ -54,9 +54,9 @@ fields(?CONF_NS) ->
|
||||||
, {backend, emqx_authn_schema:backend(postgresql)}
|
, {backend, emqx_authn_schema:backend(postgresql)}
|
||||||
, {password_hash_algorithm, fun emqx_authn_password_hashing:type_ro/1}
|
, {password_hash_algorithm, fun emqx_authn_password_hashing:type_ro/1}
|
||||||
, {query, fun query/1}
|
, {query, fun query/1}
|
||||||
] ++ emqx_authn_schema:common_fields()
|
] ++
|
||||||
++ emqx_connector_schema_lib:relational_db_fields()
|
emqx_authn_schema:common_fields() ++
|
||||||
++ emqx_connector_schema_lib:ssl_fields().
|
proplists:delete(named_queries, emqx_connector_pgsql:fields(config)).
|
||||||
|
|
||||||
query(type) -> string();
|
query(type) -> string();
|
||||||
query(_) -> undefined.
|
query(_) -> undefined.
|
||||||
|
|
|
@ -19,13 +19,13 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include("emqx_authn.hrl").
|
-include("emqx_authn.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
|
||||||
-define(MONGO_HOST, "mongo").
|
-define(MONGO_HOST, "mongo").
|
||||||
-define(MONGO_PORT, 27017).
|
|
||||||
-define(MONGO_CLIENT, 'emqx_authn_mongo_SUITE_client').
|
-define(MONGO_CLIENT, 'emqx_authn_mongo_SUITE_client').
|
||||||
|
|
||||||
-define(PATH, [authentication]).
|
-define(PATH, [authentication]).
|
||||||
|
@ -47,7 +47,7 @@ end_per_testcase(_TestCase, _Config) ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
_ = application:load(emqx_conf),
|
_ = application:load(emqx_conf),
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
||||||
ok = start_apps([emqx_resource, emqx_connector]),
|
ok = start_apps([emqx_resource, emqx_connector]),
|
||||||
|
@ -386,16 +386,13 @@ drop_seeds() ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
mongo_server() ->
|
mongo_server() ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(io_lib:format("~s",[?MONGO_HOST])).
|
||||||
io_lib:format(
|
|
||||||
"~s:~b",
|
|
||||||
[?MONGO_HOST, ?MONGO_PORT])).
|
|
||||||
|
|
||||||
mongo_config() ->
|
mongo_config() ->
|
||||||
[
|
[
|
||||||
{database, <<"mqtt">>},
|
{database, <<"mqtt">>},
|
||||||
{host, ?MONGO_HOST},
|
{host, ?MONGO_HOST},
|
||||||
{port, ?MONGO_PORT},
|
{port, ?MONGO_DEFAULT_PORT},
|
||||||
{register, ?MONGO_CLIENT}
|
{register, ?MONGO_CLIENT}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include("emqx_authn.hrl").
|
-include("emqx_authn.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
@ -26,7 +27,6 @@
|
||||||
|
|
||||||
|
|
||||||
-define(MONGO_HOST, "mongo-tls").
|
-define(MONGO_HOST, "mongo-tls").
|
||||||
-define(MONGO_PORT, 27017).
|
|
||||||
|
|
||||||
-define(PATH, [authentication]).
|
-define(PATH, [authentication]).
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ init_per_testcase(_TestCase, Config) ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
_ = application:load(emqx_conf),
|
_ = application:load(emqx_conf),
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
||||||
ok = start_apps([emqx_resource, emqx_connector]),
|
ok = start_apps([emqx_resource, emqx_connector]),
|
||||||
|
@ -78,8 +78,8 @@ t_create(_Config) ->
|
||||||
<<"versions">> => [<<"tlsv1.2">>],
|
<<"versions">> => [<<"tlsv1.2">>],
|
||||||
<<"ciphers">> => [<<"ECDHE-RSA-AES256-GCM-SHA384">>]}),
|
<<"ciphers">> => [<<"ECDHE-RSA-AES256-GCM-SHA384">>]}),
|
||||||
fun({ok, _}, Trace) ->
|
fun({ok, _}, Trace) ->
|
||||||
?assertEqual(
|
?assertMatch(
|
||||||
[ok],
|
[ok | _],
|
||||||
?projection(
|
?projection(
|
||||||
status,
|
status,
|
||||||
?of_kind(emqx_connector_mongo_health_check, Trace)))
|
?of_kind(emqx_connector_mongo_health_check, Trace)))
|
||||||
|
@ -174,10 +174,7 @@ raw_mongo_auth_config(SpecificSSLOpts) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
mongo_server() ->
|
mongo_server() ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(io_lib:format("~s",[?MONGO_HOST])).
|
||||||
io_lib:format(
|
|
||||||
"~s:~b",
|
|
||||||
[?MONGO_HOST, ?MONGO_PORT])).
|
|
||||||
|
|
||||||
start_apps(Apps) ->
|
start_apps(Apps) ->
|
||||||
lists:foreach(fun application:ensure_all_started/1, Apps).
|
lists:foreach(fun application:ensure_all_started/1, Apps).
|
||||||
|
|
|
@ -19,12 +19,12 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include("emqx_authn.hrl").
|
-include("emqx_authn.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
-define(MYSQL_HOST, "mysql").
|
-define(MYSQL_HOST, "mysql").
|
||||||
-define(MYSQL_PORT, 3306).
|
|
||||||
-define(MYSQL_RESOURCE, <<"emqx_authn_mysql_SUITE">>).
|
-define(MYSQL_RESOURCE, <<"emqx_authn_mysql_SUITE">>).
|
||||||
|
|
||||||
-define(PATH, [authentication]).
|
-define(PATH, [authentication]).
|
||||||
|
@ -53,7 +53,7 @@ end_per_group(require_seeds, Config) ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
_ = application:load(emqx_conf),
|
_ = application:load(emqx_conf),
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
||||||
ok = start_apps([emqx_resource, emqx_connector]),
|
ok = start_apps([emqx_resource, emqx_connector]),
|
||||||
|
@ -391,10 +391,7 @@ drop_seeds() ->
|
||||||
ok = q("DROP TABLE IF EXISTS users").
|
ok = q("DROP TABLE IF EXISTS users").
|
||||||
|
|
||||||
mysql_server() ->
|
mysql_server() ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(io_lib:format("~s",[?MYSQL_HOST])).
|
||||||
io_lib:format(
|
|
||||||
"~s:~b",
|
|
||||||
[?MYSQL_HOST, ?MYSQL_PORT])).
|
|
||||||
|
|
||||||
mysql_config() ->
|
mysql_config() ->
|
||||||
#{auto_reconnect => true,
|
#{auto_reconnect => true,
|
||||||
|
@ -402,7 +399,7 @@ mysql_config() ->
|
||||||
username => <<"root">>,
|
username => <<"root">>,
|
||||||
password => <<"public">>,
|
password => <<"public">>,
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
server => {?MYSQL_HOST, ?MYSQL_PORT},
|
server => {?MYSQL_HOST, ?MYSQL_DEFAULT_PORT},
|
||||||
ssl => #{enable => false}
|
ssl => #{enable => false}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -19,12 +19,12 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include("emqx_authn.hrl").
|
-include("emqx_authn.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
-define(MYSQL_HOST, "mysql-tls").
|
-define(MYSQL_HOST, "mysql-tls").
|
||||||
-define(MYSQL_PORT, 3306).
|
|
||||||
|
|
||||||
-define(PATH, [authentication]).
|
-define(PATH, [authentication]).
|
||||||
|
|
||||||
|
@ -44,7 +44,7 @@ init_per_testcase(_, Config) ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
_ = application:load(emqx_conf),
|
_ = application:load(emqx_conf),
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
||||||
ok = start_apps([emqx_resource, emqx_connector]),
|
ok = start_apps([emqx_resource, emqx_connector]),
|
||||||
|
@ -132,10 +132,7 @@ raw_mysql_auth_config(SpecificSSLOpts) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
mysql_server() ->
|
mysql_server() ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(io_lib:format("~s",[?MYSQL_HOST])).
|
||||||
io_lib:format(
|
|
||||||
"~s:~b",
|
|
||||||
[?MYSQL_HOST, ?MYSQL_PORT])).
|
|
||||||
|
|
||||||
start_apps(Apps) ->
|
start_apps(Apps) ->
|
||||||
lists:foreach(fun application:ensure_all_started/1, Apps).
|
lists:foreach(fun application:ensure_all_started/1, Apps).
|
||||||
|
|
|
@ -19,13 +19,13 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include("emqx_authn.hrl").
|
-include("emqx_authn.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("emqx/include/emqx_placeholder.hrl").
|
-include_lib("emqx/include/emqx_placeholder.hrl").
|
||||||
|
|
||||||
-define(PGSQL_HOST, "pgsql").
|
-define(PGSQL_HOST, "pgsql").
|
||||||
-define(PGSQL_PORT, 5432).
|
|
||||||
-define(PGSQL_RESOURCE, <<"emqx_authn_pgsql_SUITE">>).
|
-define(PGSQL_RESOURCE, <<"emqx_authn_pgsql_SUITE">>).
|
||||||
|
|
||||||
-define(PATH, [authentication]).
|
-define(PATH, [authentication]).
|
||||||
|
@ -54,7 +54,7 @@ end_per_group(require_seeds, Config) ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
_ = application:load(emqx_conf),
|
_ = application:load(emqx_conf),
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
||||||
ok = start_apps([emqx_resource, emqx_connector]),
|
ok = start_apps([emqx_resource, emqx_connector]),
|
||||||
|
@ -439,10 +439,7 @@ drop_seeds() ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
pgsql_server() ->
|
pgsql_server() ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(io_lib:format("~s",[?PGSQL_HOST])).
|
||||||
io_lib:format(
|
|
||||||
"~s:~b",
|
|
||||||
[?PGSQL_HOST, ?PGSQL_PORT])).
|
|
||||||
|
|
||||||
pgsql_config() ->
|
pgsql_config() ->
|
||||||
#{auto_reconnect => true,
|
#{auto_reconnect => true,
|
||||||
|
@ -450,7 +447,7 @@ pgsql_config() ->
|
||||||
username => <<"root">>,
|
username => <<"root">>,
|
||||||
password => <<"public">>,
|
password => <<"public">>,
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
server => {?PGSQL_HOST, ?PGSQL_PORT},
|
server => {?PGSQL_HOST, ?PGSQL_DEFAULT_PORT},
|
||||||
ssl => #{enable => false}
|
ssl => #{enable => false}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -19,12 +19,12 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include("emqx_authn.hrl").
|
-include("emqx_authn.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
-define(PGSQL_HOST, "pgsql-tls").
|
-define(PGSQL_HOST, "pgsql-tls").
|
||||||
-define(PGSQL_PORT, 5432).
|
|
||||||
|
|
||||||
-define(PATH, [authentication]).
|
-define(PATH, [authentication]).
|
||||||
|
|
||||||
|
@ -44,7 +44,7 @@ init_per_testcase(_, Config) ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
_ = application:load(emqx_conf),
|
_ = application:load(emqx_conf),
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
||||||
ok = start_apps([emqx_resource, emqx_connector]),
|
ok = start_apps([emqx_resource, emqx_connector]),
|
||||||
|
@ -131,14 +131,10 @@ raw_pgsql_auth_config(SpecificSSLOpts) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
pgsql_server() ->
|
pgsql_server() ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(io_lib:format("~s",[?PGSQL_HOST])).
|
||||||
io_lib:format(
|
|
||||||
"~s:~b",
|
|
||||||
[?PGSQL_HOST, ?PGSQL_PORT])).
|
|
||||||
|
|
||||||
start_apps(Apps) ->
|
start_apps(Apps) ->
|
||||||
lists:foreach(fun application:ensure_all_started/1, Apps).
|
lists:foreach(fun application:ensure_all_started/1, Apps).
|
||||||
|
|
||||||
stop_apps(Apps) ->
|
stop_apps(Apps) ->
|
||||||
lists:foreach(fun application:stop/1, Apps).
|
lists:foreach(fun application:stop/1, Apps).
|
||||||
|
|
||||||
|
|
|
@ -19,12 +19,12 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include("emqx_authn.hrl").
|
-include("emqx_authn.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
-define(REDIS_HOST, "redis").
|
-define(REDIS_HOST, "redis").
|
||||||
-define(REDIS_PORT, 6379).
|
|
||||||
-define(REDIS_RESOURCE, <<"emqx_authn_redis_SUITE">>).
|
-define(REDIS_RESOURCE, <<"emqx_authn_redis_SUITE">>).
|
||||||
|
|
||||||
-define(PATH, [authentication]).
|
-define(PATH, [authentication]).
|
||||||
|
@ -53,7 +53,7 @@ end_per_group(require_seeds, Config) ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
_ = application:load(emqx_conf),
|
_ = application:load(emqx_conf),
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
||||||
ok = start_apps([emqx_resource, emqx_connector]),
|
ok = start_apps([emqx_resource, emqx_connector]),
|
||||||
|
@ -380,10 +380,7 @@ drop_seeds() ->
|
||||||
user_seeds()).
|
user_seeds()).
|
||||||
|
|
||||||
redis_server() ->
|
redis_server() ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(io_lib:format("~s",[?REDIS_HOST])).
|
||||||
io_lib:format(
|
|
||||||
"~s:~b",
|
|
||||||
[?REDIS_HOST, ?REDIS_PORT])).
|
|
||||||
|
|
||||||
redis_config() ->
|
redis_config() ->
|
||||||
#{auto_reconnect => true,
|
#{auto_reconnect => true,
|
||||||
|
@ -391,7 +388,7 @@ redis_config() ->
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
redis_type => single,
|
redis_type => single,
|
||||||
password => "public",
|
password => "public",
|
||||||
server => {?REDIS_HOST, ?REDIS_PORT},
|
server => {?REDIS_HOST, ?REDIS_DEFAULT_PORT},
|
||||||
ssl => #{enable => false}
|
ssl => #{enable => false}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -19,12 +19,13 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include("emqx_authn.hrl").
|
-include("emqx_authn.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
-define(REDIS_HOST, "redis-tls").
|
-define(REDIS_HOST, "redis-tls").
|
||||||
-define(REDIS_PORT, 6380).
|
-define(REDIS_TLS_PORT, 6380).
|
||||||
|
|
||||||
-define(PATH, [authentication]).
|
-define(PATH, [authentication]).
|
||||||
|
|
||||||
|
@ -44,7 +45,7 @@ init_per_testcase(_, Config) ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
_ = application:load(emqx_conf),
|
_ = application:load(emqx_conf),
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_TLS_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
|
||||||
ok = start_apps([emqx_resource, emqx_connector]),
|
ok = start_apps([emqx_resource, emqx_connector]),
|
||||||
|
@ -127,10 +128,7 @@ raw_redis_auth_config(SpecificSSLOpts) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
redis_server() ->
|
redis_server() ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(io_lib:format("~s:~b",[?REDIS_HOST, ?REDIS_TLS_PORT])).
|
||||||
io_lib:format(
|
|
||||||
"~s:~b",
|
|
||||||
[?REDIS_HOST, ?REDIS_PORT])).
|
|
||||||
|
|
||||||
start_apps(Apps) ->
|
start_apps(Apps) ->
|
||||||
lists:foreach(fun application:ensure_all_started/1, Apps).
|
lists:foreach(fun application:ensure_all_started/1, Apps).
|
||||||
|
|
|
@ -144,12 +144,10 @@ fields(redis_cluster) ->
|
||||||
[ {cmd, query()} ].
|
[ {cmd, query()} ].
|
||||||
|
|
||||||
http_common_fields() ->
|
http_common_fields() ->
|
||||||
[ {type, #{type => http}}
|
[ {url, fun url/1}
|
||||||
, {enable, #{type => boolean(), default => true}}
|
|
||||||
, {url, fun url/1}
|
|
||||||
, {request_timeout, mk_duration("request timeout", #{default => "30s"})}
|
, {request_timeout, mk_duration("request timeout", #{default => "30s"})}
|
||||||
, {body, #{type => map(), nullable => true}}
|
, {body, #{type => map(), nullable => true}}
|
||||||
] ++ proplists:delete(base_url, emqx_connector_http:fields(config)).
|
] ++ proplists:delete(base_url, connector_fields(http)).
|
||||||
|
|
||||||
mongo_common_fields() ->
|
mongo_common_fields() ->
|
||||||
[ {collection, #{type => atom()}}
|
[ {collection, #{type => atom()}}
|
||||||
|
|
|
@ -26,7 +26,10 @@
|
||||||
-define(HOST, "http://127.0.0.1:18083/").
|
-define(HOST, "http://127.0.0.1:18083/").
|
||||||
-define(API_VERSION, "v5").
|
-define(API_VERSION, "v5").
|
||||||
-define(BASE_PATH, "api").
|
-define(BASE_PATH, "api").
|
||||||
-define(MONGO_SINGLE_HOST, "mongo:27017").
|
-define(MONGO_SINGLE_HOST, "mongo").
|
||||||
|
-define(MYSQL_HOST, "mysql:3306").
|
||||||
|
-define(PGSQL_HOST, "pgsql").
|
||||||
|
-define(REDIS_SINGLE_HOST, "redis").
|
||||||
|
|
||||||
-define(SOURCE1, #{<<"type">> => <<"http">>,
|
-define(SOURCE1, #{<<"type">> => <<"http">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
|
@ -48,7 +51,7 @@
|
||||||
}).
|
}).
|
||||||
-define(SOURCE3, #{<<"type">> => <<"mysql">>,
|
-define(SOURCE3, #{<<"type">> => <<"mysql">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"server">> => <<"mysql:3306">>,
|
<<"server">> => <<?MYSQL_HOST>>,
|
||||||
<<"pool_size">> => 1,
|
<<"pool_size">> => 1,
|
||||||
<<"database">> => <<"mqtt">>,
|
<<"database">> => <<"mqtt">>,
|
||||||
<<"username">> => <<"xx">>,
|
<<"username">> => <<"xx">>,
|
||||||
|
@ -59,7 +62,7 @@
|
||||||
}).
|
}).
|
||||||
-define(SOURCE4, #{<<"type">> => <<"postgresql">>,
|
-define(SOURCE4, #{<<"type">> => <<"postgresql">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"server">> => <<"pgsql:5432">>,
|
<<"server">> => <<?PGSQL_HOST>>,
|
||||||
<<"pool_size">> => 1,
|
<<"pool_size">> => 1,
|
||||||
<<"database">> => <<"mqtt">>,
|
<<"database">> => <<"mqtt">>,
|
||||||
<<"username">> => <<"xx">>,
|
<<"username">> => <<"xx">>,
|
||||||
|
@ -70,7 +73,7 @@
|
||||||
}).
|
}).
|
||||||
-define(SOURCE5, #{<<"type">> => <<"redis">>,
|
-define(SOURCE5, #{<<"type">> => <<"redis">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"servers">> => <<"redis:6379,127.0.0.1:6380">>,
|
<<"servers">> => <<?REDIS_SINGLE_HOST, ",127.0.0.1:6380">>,
|
||||||
<<"pool_size">> => 1,
|
<<"pool_size">> => 1,
|
||||||
<<"database">> => 0,
|
<<"database">> => 0,
|
||||||
<<"password">> => <<"ee">>,
|
<<"password">> => <<"ee">>,
|
||||||
|
|
|
@ -18,13 +18,13 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include("emqx_authz.hrl").
|
-include("emqx_authz.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("emqx/include/emqx_placeholder.hrl").
|
-include_lib("emqx/include/emqx_placeholder.hrl").
|
||||||
|
|
||||||
-define(MONGO_HOST, "mongo").
|
-define(MONGO_HOST, "mongo").
|
||||||
-define(MONGO_PORT, 27017).
|
|
||||||
-define(MONGO_CLIENT, 'emqx_authz_mongo_SUITE_client').
|
-define(MONGO_CLIENT, 'emqx_authz_mongo_SUITE_client').
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
|
@ -34,7 +34,7 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_common_test_helpers:start_apps(
|
ok = emqx_common_test_helpers:start_apps(
|
||||||
[emqx_conf, emqx_authz],
|
[emqx_conf, emqx_authz],
|
||||||
|
@ -237,16 +237,13 @@ raw_mongo_authz_config() ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
mongo_server() ->
|
mongo_server() ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(io_lib:format("~s",[?MONGO_HOST])).
|
||||||
io_lib:format(
|
|
||||||
"~s:~b",
|
|
||||||
[?MONGO_HOST, ?MONGO_PORT])).
|
|
||||||
|
|
||||||
mongo_config() ->
|
mongo_config() ->
|
||||||
[
|
[
|
||||||
{database, <<"mqtt">>},
|
{database, <<"mqtt">>},
|
||||||
{host, ?MONGO_HOST},
|
{host, ?MONGO_HOST},
|
||||||
{port, ?MONGO_PORT},
|
{port, ?MONGO_DEFAULT_PORT},
|
||||||
{register, ?MONGO_CLIENT}
|
{register, ?MONGO_CLIENT}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
|
@ -18,13 +18,12 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include("emqx_authz.hrl").
|
-include("emqx_authz.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
|
||||||
-define(MYSQL_HOST, "mysql").
|
-define(MYSQL_HOST, "mysql").
|
||||||
-define(MYSQL_PORT, 3306).
|
|
||||||
-define(MYSQL_RESOURCE, <<"emqx_authz_mysql_SUITE">>).
|
-define(MYSQL_RESOURCE, <<"emqx_authz_mysql_SUITE">>).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
|
@ -34,7 +33,7 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_common_test_helpers:start_apps(
|
ok = emqx_common_test_helpers:start_apps(
|
||||||
[emqx_conf, emqx_authz],
|
[emqx_conf, emqx_authz],
|
||||||
|
@ -274,10 +273,7 @@ setup_config(SpecialParams) ->
|
||||||
SpecialParams).
|
SpecialParams).
|
||||||
|
|
||||||
mysql_server() ->
|
mysql_server() ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(io_lib:format("~s",[?MYSQL_HOST])).
|
||||||
io_lib:format(
|
|
||||||
"~s:~b",
|
|
||||||
[?MYSQL_HOST, ?MYSQL_PORT])).
|
|
||||||
|
|
||||||
mysql_config() ->
|
mysql_config() ->
|
||||||
#{auto_reconnect => true,
|
#{auto_reconnect => true,
|
||||||
|
@ -285,7 +281,7 @@ mysql_config() ->
|
||||||
username => <<"root">>,
|
username => <<"root">>,
|
||||||
password => <<"public">>,
|
password => <<"public">>,
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
server => {?MYSQL_HOST, ?MYSQL_PORT},
|
server => {?MYSQL_HOST, ?MYSQL_DEFAULT_PORT},
|
||||||
ssl => #{enable => false}
|
ssl => #{enable => false}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -18,13 +18,12 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include("emqx_authz.hrl").
|
-include("emqx_authz.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
|
||||||
-define(PGSQL_HOST, "pgsql").
|
-define(PGSQL_HOST, "pgsql").
|
||||||
-define(PGSQL_PORT, 5432).
|
|
||||||
-define(PGSQL_RESOURCE, <<"emqx_authz_pgsql_SUITE">>).
|
-define(PGSQL_RESOURCE, <<"emqx_authz_pgsql_SUITE">>).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
|
@ -34,7 +33,7 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_common_test_helpers:start_apps(
|
ok = emqx_common_test_helpers:start_apps(
|
||||||
[emqx_conf, emqx_authz],
|
[emqx_conf, emqx_authz],
|
||||||
|
@ -278,10 +277,7 @@ setup_config(SpecialParams) ->
|
||||||
SpecialParams).
|
SpecialParams).
|
||||||
|
|
||||||
pgsql_server() ->
|
pgsql_server() ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(io_lib:format("~s",[?PGSQL_HOST])).
|
||||||
io_lib:format(
|
|
||||||
"~s:~b",
|
|
||||||
[?PGSQL_HOST, ?PGSQL_PORT])).
|
|
||||||
|
|
||||||
pgsql_config() ->
|
pgsql_config() ->
|
||||||
#{auto_reconnect => true,
|
#{auto_reconnect => true,
|
||||||
|
@ -289,7 +285,7 @@ pgsql_config() ->
|
||||||
username => <<"root">>,
|
username => <<"root">>,
|
||||||
password => <<"public">>,
|
password => <<"public">>,
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
server => {?PGSQL_HOST, ?PGSQL_PORT},
|
server => {?PGSQL_HOST, ?PGSQL_DEFAULT_PORT},
|
||||||
ssl => #{enable => false}
|
ssl => #{enable => false}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -19,13 +19,12 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include("emqx_authz.hrl").
|
-include("emqx_authz.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
|
||||||
-define(REDIS_HOST, "redis").
|
-define(REDIS_HOST, "redis").
|
||||||
-define(REDIS_PORT, 6379).
|
|
||||||
-define(REDIS_RESOURCE, <<"emqx_authz_redis_SUITE">>).
|
-define(REDIS_RESOURCE, <<"emqx_authz_redis_SUITE">>).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
|
@ -35,7 +34,7 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_common_test_helpers:start_apps(
|
ok = emqx_common_test_helpers:start_apps(
|
||||||
[emqx_conf, emqx_authz],
|
[emqx_conf, emqx_authz],
|
||||||
|
@ -219,10 +218,7 @@ raw_redis_authz_config() ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
redis_server() ->
|
redis_server() ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(io_lib:format("~s",[?REDIS_HOST])).
|
||||||
io_lib:format(
|
|
||||||
"~s:~b",
|
|
||||||
[?REDIS_HOST, ?REDIS_PORT])).
|
|
||||||
|
|
||||||
q(Command) ->
|
q(Command) ->
|
||||||
emqx_resource:query(
|
emqx_resource:query(
|
||||||
|
@ -235,7 +231,7 @@ redis_config() ->
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
redis_type => single,
|
redis_type => single,
|
||||||
password => "public",
|
password => "public",
|
||||||
server => {?REDIS_HOST, ?REDIS_PORT},
|
server => {?REDIS_HOST, ?REDIS_DEFAULT_PORT},
|
||||||
ssl => #{enable => false}
|
ssl => #{enable => false}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,35 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-define(VALID, emqx_resource_validator).
|
-define(VALID, emqx_resource_validator).
|
||||||
-define(NOT_EMPTY(MSG), ?VALID:not_empty(MSG)).
|
-define(NOT_EMPTY(MSG), ?VALID:not_empty(MSG)).
|
||||||
-define(MAX(MAXV), ?VALID:max(number, MAXV)).
|
-define(MAX(MAXV), ?VALID:max(number, MAXV)).
|
||||||
-define(MIN(MINV), ?VALID:min(number, MINV)).
|
-define(MIN(MINV), ?VALID:min(number, MINV)).
|
||||||
|
|
||||||
|
-define(MYSQL_DEFAULT_PORT, 3306).
|
||||||
|
-define(MONGO_DEFAULT_PORT, 27017).
|
||||||
|
-define(REDIS_DEFAULT_PORT, 6379).
|
||||||
|
-define(PGSQL_DEFAULT_PORT, 5432).
|
||||||
|
|
||||||
|
-define(SERVERS_DESC, "A Node list for Cluster to connect to. The nodes should be splited with ',', such as: 'Node[,Node]'<br>\nFor each Node should be:<br>").
|
||||||
|
|
||||||
|
-define(SERVER_DESC(TYPE, DEFAULT_PORT), """
|
||||||
|
The IPv4 or IPv6 address or host name to connect to.<br>
|
||||||
|
A host entry has the following form: 'Host[:Port]'<br>
|
||||||
|
The """ ++ TYPE ++ " default port " ++ DEFAULT_PORT ++ " is used if '[:Port]' isn't present"
|
||||||
|
).
|
||||||
|
|
||||||
|
-define(THROW_ERROR(Str), erlang:throw({error, Str})).
|
||||||
|
|
|
@ -135,7 +135,7 @@ connect(Opts) ->
|
||||||
{ok, LDAP}.
|
{ok, LDAP}.
|
||||||
|
|
||||||
ldap_fields() ->
|
ldap_fields() ->
|
||||||
[ {servers, fun emqx_connector_schema_lib:servers/1}
|
[ {servers, fun servers/1}
|
||||||
, {port, fun port/1}
|
, {port, fun port/1}
|
||||||
, {pool_size, fun emqx_connector_schema_lib:pool_size/1}
|
, {pool_size, fun emqx_connector_schema_lib:pool_size/1}
|
||||||
, {bind_dn, fun bind_dn/1}
|
, {bind_dn, fun bind_dn/1}
|
||||||
|
@ -144,6 +144,11 @@ ldap_fields() ->
|
||||||
, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
|
, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
servers(type) -> list();
|
||||||
|
servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
|
||||||
|
servers(converter) -> fun to_servers_raw/1;
|
||||||
|
servers(_) -> undefined.
|
||||||
|
|
||||||
bind_dn(type) -> binary();
|
bind_dn(type) -> binary();
|
||||||
bind_dn(default) -> 0;
|
bind_dn(default) -> 0;
|
||||||
bind_dn(_) -> undefined.
|
bind_dn(_) -> undefined.
|
||||||
|
@ -154,3 +159,20 @@ port(_) -> undefined.
|
||||||
|
|
||||||
duration(type) -> emqx_schema:duration_ms();
|
duration(type) -> emqx_schema:duration_ms();
|
||||||
duration(_) -> undefined.
|
duration(_) -> undefined.
|
||||||
|
|
||||||
|
to_servers_raw(Servers) ->
|
||||||
|
{ok, lists:map( fun(Server) ->
|
||||||
|
case string:tokens(Server, ": ") of
|
||||||
|
[Ip] ->
|
||||||
|
[{host, Ip}];
|
||||||
|
[Ip, Port] ->
|
||||||
|
[{host, Ip}, {port, list_to_integer(Port)}]
|
||||||
|
end
|
||||||
|
end, string:tokens(str(Servers), ", "))}.
|
||||||
|
|
||||||
|
str(A) when is_atom(A) ->
|
||||||
|
atom_to_list(A);
|
||||||
|
str(B) when is_binary(B) ->
|
||||||
|
binary_to_list(B);
|
||||||
|
str(S) when is_list(S) ->
|
||||||
|
S.
|
||||||
|
|
|
@ -20,10 +20,6 @@
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-type server() :: emqx_schema:ip_port().
|
|
||||||
-reflect_type([server/0]).
|
|
||||||
-typerefl_from_string({server/0, emqx_connector_schema_lib, to_ip_port}).
|
|
||||||
|
|
||||||
-behaviour(emqx_resource).
|
-behaviour(emqx_resource).
|
||||||
|
|
||||||
%% callbacks of behaviour emqx_resource
|
%% callbacks of behaviour emqx_resource
|
||||||
|
@ -42,13 +38,18 @@
|
||||||
|
|
||||||
-define(HEALTH_CHECK_TIMEOUT, 10000).
|
-define(HEALTH_CHECK_TIMEOUT, 10000).
|
||||||
|
|
||||||
|
%% mongo servers don't need parse
|
||||||
|
-define( MONGO_HOST_OPTIONS
|
||||||
|
, #{ host_type => hostname
|
||||||
|
, default_port => ?MONGO_DEFAULT_PORT}).
|
||||||
|
|
||||||
%%=====================================================================
|
%%=====================================================================
|
||||||
roots() ->
|
roots() ->
|
||||||
[ {config, #{type => hoconsc:union(
|
[ {config, #{type => hoconsc:union(
|
||||||
[ hoconsc:ref(?MODULE, single)
|
[ hoconsc:ref(?MODULE, single)
|
||||||
, hoconsc:ref(?MODULE, rs)
|
, hoconsc:ref(?MODULE, rs)
|
||||||
, hoconsc:ref(?MODULE, sharded)
|
, hoconsc:ref(?MODULE, sharded)
|
||||||
])}}
|
])}}
|
||||||
].
|
].
|
||||||
|
|
||||||
fields(single) ->
|
fields(single) ->
|
||||||
|
@ -284,12 +285,21 @@ init_worker_options([_ | R], Acc) ->
|
||||||
init_worker_options(R, Acc);
|
init_worker_options(R, Acc);
|
||||||
init_worker_options([], Acc) -> Acc.
|
init_worker_options([], Acc) -> Acc.
|
||||||
|
|
||||||
server(type) -> binary();
|
%% ===================================================================
|
||||||
|
%% Schema funcs
|
||||||
|
|
||||||
|
server(type) -> emqx_schema:ip_port();
|
||||||
|
server(nullable) -> false;
|
||||||
server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
|
server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
|
||||||
|
server(converter) -> fun to_server_raw/1;
|
||||||
|
server(desc) -> ?SERVER_DESC("MongoDB", integer_to_list(?MONGO_DEFAULT_PORT));
|
||||||
server(_) -> undefined.
|
server(_) -> undefined.
|
||||||
|
|
||||||
servers(type) -> binary();
|
servers(type) -> binary();
|
||||||
|
servers(nullable) -> false;
|
||||||
servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
|
servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
|
||||||
|
servers(converter) -> fun to_servers_raw/1;
|
||||||
|
servers(desc) -> ?SERVERS_DESC ++ server(desc);
|
||||||
servers(_) -> undefined.
|
servers(_) -> undefined.
|
||||||
|
|
||||||
w_mode(type) -> hoconsc:enum([unsafe, safe]);
|
w_mode(type) -> hoconsc:enum([unsafe, safe]);
|
||||||
|
@ -312,19 +322,12 @@ srv_record(type) -> boolean();
|
||||||
srv_record(default) -> false;
|
srv_record(default) -> false;
|
||||||
srv_record(_) -> undefined.
|
srv_record(_) -> undefined.
|
||||||
|
|
||||||
parse_servers(Type, Servers) when is_binary(Servers) ->
|
%% ===================================================================
|
||||||
parse_servers(Type, binary_to_list(Servers));
|
%% Internal funcs
|
||||||
parse_servers(Type, Servers) when is_list(Servers) ->
|
|
||||||
case string:split(Servers, ",", all) of
|
|
||||||
[Host | _] when Type =:= single ->
|
|
||||||
[Host];
|
|
||||||
Hosts ->
|
|
||||||
Hosts
|
|
||||||
end.
|
|
||||||
|
|
||||||
may_parse_srv_and_txt_records(#{server := Server} = Config) ->
|
may_parse_srv_and_txt_records(#{server := Server} = Config) ->
|
||||||
NConfig = maps:remove(server, Config),
|
NConfig = maps:remove(server, Config),
|
||||||
may_parse_srv_and_txt_records_(NConfig#{servers => Server});
|
may_parse_srv_and_txt_records_(NConfig#{servers => [Server]});
|
||||||
may_parse_srv_and_txt_records(Config) ->
|
may_parse_srv_and_txt_records(Config) ->
|
||||||
may_parse_srv_and_txt_records_(Config).
|
may_parse_srv_and_txt_records_(Config).
|
||||||
|
|
||||||
|
@ -335,47 +338,52 @@ may_parse_srv_and_txt_records_(#{mongo_type := Type,
|
||||||
true ->
|
true ->
|
||||||
error({missing_parameter, replica_set_name});
|
error({missing_parameter, replica_set_name});
|
||||||
false ->
|
false ->
|
||||||
Config#{hosts => parse_servers(Type, Servers)}
|
Config#{hosts => servers_to_bin(Servers)}
|
||||||
end;
|
end;
|
||||||
may_parse_srv_and_txt_records_(#{mongo_type := Type,
|
may_parse_srv_and_txt_records_(#{mongo_type := Type,
|
||||||
srv_record := true,
|
srv_record := true,
|
||||||
servers := Servers} = Config) ->
|
servers := Servers} = Config) ->
|
||||||
NServers = binary_to_list(Servers),
|
Hosts = parse_srv_records(Type, Servers),
|
||||||
Hosts = parse_srv_records(Type, NServers),
|
ExtraOpts = parse_txt_records(Type, Servers),
|
||||||
ExtraOpts = parse_txt_records(Type, NServers),
|
|
||||||
maps:merge(Config#{hosts => Hosts}, ExtraOpts).
|
maps:merge(Config#{hosts => Hosts}, ExtraOpts).
|
||||||
|
|
||||||
parse_srv_records(Type, Server) ->
|
parse_srv_records(Type, Servers) ->
|
||||||
case inet_res:lookup("_mongodb._tcp." ++ Server, in, srv) of
|
Fun = fun(AccIn, {IpOrHost, _Port}) ->
|
||||||
[] ->
|
case inet_res:lookup("_mongodb._tcp." ++ ip_or_host_to_string(IpOrHost), in, srv) of
|
||||||
error(service_not_found);
|
[] ->
|
||||||
Services ->
|
error(service_not_found);
|
||||||
case [Host ++ ":" ++ integer_to_list(Port) || {_, _, Port, Host} <- Services] of
|
Services ->
|
||||||
[H | _] when Type =:= single ->
|
[ [server_to_bin({Host, Port}) || {_, _, Port, Host} <- Services]
|
||||||
[H];
|
| AccIn]
|
||||||
Hosts ->
|
end
|
||||||
Hosts
|
end,
|
||||||
end
|
Res = lists:foldl(Fun, [], Servers),
|
||||||
|
case Type of
|
||||||
|
single -> lists:nth(1, Res);
|
||||||
|
_ -> Res
|
||||||
end.
|
end.
|
||||||
|
|
||||||
parse_txt_records(Type, Server) ->
|
parse_txt_records(Type, Servers) ->
|
||||||
case inet_res:lookup(Server, in, txt) of
|
Fun = fun(AccIn, {IpOrHost, _Port}) ->
|
||||||
[] ->
|
case inet_res:lookup(IpOrHost, in, txt) of
|
||||||
#{};
|
[] ->
|
||||||
[[QueryString]] ->
|
#{};
|
||||||
case uri_string:dissect_query(QueryString) of
|
[[QueryString]] ->
|
||||||
{error, _, _} ->
|
case uri_string:dissect_query(QueryString) of
|
||||||
error({invalid_txt_record, invalid_query_string});
|
{error, _, _} ->
|
||||||
Options ->
|
error({invalid_txt_record, invalid_query_string});
|
||||||
Fields = case Type of
|
Options ->
|
||||||
rs -> ["authSource", "replicaSet"];
|
Fields = case Type of
|
||||||
_ -> ["authSource"]
|
rs -> ["authSource", "replicaSet"];
|
||||||
end,
|
_ -> ["authSource"]
|
||||||
take_and_convert(Fields, Options)
|
end,
|
||||||
end;
|
maps:merge(AccIn, take_and_convert(Fields, Options))
|
||||||
_ ->
|
end;
|
||||||
error({invalid_txt_record, multiple_records})
|
_ ->
|
||||||
end.
|
error({invalid_txt_record, multiple_records})
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
lists:foldl(Fun, #{}, Servers).
|
||||||
|
|
||||||
take_and_convert(Fields, Options) ->
|
take_and_convert(Fields, Options) ->
|
||||||
take_and_convert(Fields, Options, #{}).
|
take_and_convert(Fields, Options, #{}).
|
||||||
|
@ -395,3 +403,41 @@ take_and_convert([Field | More], Options, Acc) ->
|
||||||
false ->
|
false ->
|
||||||
take_and_convert(More, Options, Acc)
|
take_and_convert(More, Options, Acc)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec ip_or_host_to_string(binary() | string() | tuple())
|
||||||
|
-> string().
|
||||||
|
ip_or_host_to_string(Ip) when is_tuple(Ip) ->
|
||||||
|
inet:ntoa(Ip);
|
||||||
|
ip_or_host_to_string(Host) ->
|
||||||
|
str(Host).
|
||||||
|
|
||||||
|
servers_to_bin([Server | Rest]) ->
|
||||||
|
[server_to_bin(Server) | servers_to_bin(Rest)];
|
||||||
|
servers_to_bin([]) ->
|
||||||
|
[].
|
||||||
|
|
||||||
|
server_to_bin({IpOrHost, Port}) ->
|
||||||
|
iolist_to_binary(ip_or_host_to_string(IpOrHost) ++ ":" ++ integer_to_list(Port)).
|
||||||
|
|
||||||
|
%% ===================================================================
|
||||||
|
%% typereflt funcs
|
||||||
|
|
||||||
|
-spec to_server_raw(string())
|
||||||
|
-> {string(), pos_integer()}.
|
||||||
|
to_server_raw(Server) ->
|
||||||
|
emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS).
|
||||||
|
|
||||||
|
-spec to_servers_raw(string())
|
||||||
|
-> [{string(), pos_integer()}].
|
||||||
|
to_servers_raw(Servers) ->
|
||||||
|
lists:map( fun(Server) ->
|
||||||
|
emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS)
|
||||||
|
end
|
||||||
|
, string:tokens(str(Servers), ", ")).
|
||||||
|
|
||||||
|
str(A) when is_atom(A) ->
|
||||||
|
atom_to_list(A);
|
||||||
|
str(B) when is_binary(B) ->
|
||||||
|
binary_to_list(B);
|
||||||
|
str(S) when is_list(S) ->
|
||||||
|
S.
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_connector_mysql).
|
-module(emqx_connector_mysql).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
@ -33,15 +34,28 @@
|
||||||
|
|
||||||
-export([do_health_check/1]).
|
-export([do_health_check/1]).
|
||||||
|
|
||||||
|
-define( MYSQL_HOST_OPTIONS
|
||||||
|
, #{ host_type => inet_addr
|
||||||
|
, default_port => ?MYSQL_DEFAULT_PORT}).
|
||||||
|
|
||||||
%%=====================================================================
|
%%=====================================================================
|
||||||
%% Hocon schema
|
%% Hocon schema
|
||||||
roots() ->
|
roots() ->
|
||||||
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
||||||
|
|
||||||
fields(config) ->
|
fields(config) ->
|
||||||
|
[ {server, fun server/1}
|
||||||
|
] ++
|
||||||
emqx_connector_schema_lib:relational_db_fields() ++
|
emqx_connector_schema_lib:relational_db_fields() ++
|
||||||
emqx_connector_schema_lib:ssl_fields().
|
emqx_connector_schema_lib:ssl_fields().
|
||||||
|
|
||||||
|
server(type) -> emqx_schema:ip_port();
|
||||||
|
server(nullable) -> false;
|
||||||
|
server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
|
||||||
|
server(converter) -> fun to_server/1;
|
||||||
|
server(desc) -> ?SERVER_DESC("MySQL", integer_to_list(?MYSQL_DEFAULT_PORT));
|
||||||
|
server(_) -> undefined.
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
on_start(InstId, #{server := {Host, Port},
|
on_start(InstId, #{server := {Host, Port},
|
||||||
database := DB,
|
database := DB,
|
||||||
|
@ -106,3 +120,8 @@ reconn_interval(false) -> false.
|
||||||
|
|
||||||
connect(Options) ->
|
connect(Options) ->
|
||||||
mysql:start_link(Options).
|
mysql:start_link(Options).
|
||||||
|
|
||||||
|
-spec to_server(string())
|
||||||
|
-> {inet:ip_address() | inet:hostname(), pos_integer()}.
|
||||||
|
to_server(Str) ->
|
||||||
|
emqx_connector_schema_lib:parse_server(Str, ?MYSQL_HOST_OPTIONS).
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_connector_pgsql).
|
-module(emqx_connector_pgsql).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("epgsql/include/epgsql.hrl").
|
-include_lib("epgsql/include/epgsql.hrl").
|
||||||
|
@ -38,13 +39,19 @@
|
||||||
|
|
||||||
-export([do_health_check/1]).
|
-export([do_health_check/1]).
|
||||||
|
|
||||||
|
-define( PGSQL_HOST_OPTIONS
|
||||||
|
, #{ host_type => inet_addr
|
||||||
|
, default_port => ?PGSQL_DEFAULT_PORT}).
|
||||||
|
|
||||||
|
|
||||||
%%=====================================================================
|
%%=====================================================================
|
||||||
|
|
||||||
roots() ->
|
roots() ->
|
||||||
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
||||||
|
|
||||||
fields(config) ->
|
fields(config) ->
|
||||||
[{named_queries, fun named_queries/1}] ++
|
[ {named_queries, fun named_queries/1}
|
||||||
|
, {server, fun server/1}] ++
|
||||||
emqx_connector_schema_lib:relational_db_fields() ++
|
emqx_connector_schema_lib:relational_db_fields() ++
|
||||||
emqx_connector_schema_lib:ssl_fields().
|
emqx_connector_schema_lib:ssl_fields().
|
||||||
|
|
||||||
|
@ -52,6 +59,13 @@ named_queries(type) -> map();
|
||||||
named_queries(nullable) -> true;
|
named_queries(nullable) -> true;
|
||||||
named_queries(_) -> undefined.
|
named_queries(_) -> undefined.
|
||||||
|
|
||||||
|
server(type) -> emqx_schema:ip_port();
|
||||||
|
server(nullable) -> false;
|
||||||
|
server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
|
||||||
|
server(converter) -> fun to_server/1;
|
||||||
|
server(desc) -> ?SERVER_DESC("PostgreSQL", integer_to_list(?PGSQL_DEFAULT_PORT));
|
||||||
|
server(_) -> undefined.
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
on_start(InstId, #{server := {Host, Port},
|
on_start(InstId, #{server := {Host, Port},
|
||||||
database := DB,
|
database := DB,
|
||||||
|
@ -163,3 +177,11 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
|
||||||
conn_opts(Opts, [Opt | Acc]);
|
conn_opts(Opts, [Opt | Acc]);
|
||||||
conn_opts([_Opt | Opts], Acc) ->
|
conn_opts([_Opt | Opts], Acc) ->
|
||||||
conn_opts(Opts, Acc).
|
conn_opts(Opts, Acc).
|
||||||
|
|
||||||
|
%% ===================================================================
|
||||||
|
%% typereflt funcs
|
||||||
|
|
||||||
|
-spec to_server(string())
|
||||||
|
-> {inet:ip_address() | inet:hostname(), pos_integer()}.
|
||||||
|
to_server(Str) ->
|
||||||
|
emqx_connector_schema_lib:parse_server(Str, ?PGSQL_HOST_OPTIONS).
|
||||||
|
|
|
@ -19,21 +19,6 @@
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-type server() :: tuple().
|
|
||||||
%% {"127.0.0.1", 7000}
|
|
||||||
%% For eredis:start_link/1~7
|
|
||||||
-reflect_type([server/0]).
|
|
||||||
-typerefl_from_string({server/0, ?MODULE, to_server}).
|
|
||||||
|
|
||||||
-type servers() :: list().
|
|
||||||
%% [{"127.0.0.1", 7000}, {"127.0.0.2", 7000}]
|
|
||||||
%% For eredis_cluster
|
|
||||||
-reflect_type([servers/0]).
|
|
||||||
-typerefl_from_string({servers/0, ?MODULE, to_servers}).
|
|
||||||
|
|
||||||
-export([ to_server/1
|
|
||||||
, to_servers/1]).
|
|
||||||
|
|
||||||
-export([roots/0, fields/1]).
|
-export([roots/0, fields/1]).
|
||||||
|
|
||||||
-behaviour(emqx_resource).
|
-behaviour(emqx_resource).
|
||||||
|
@ -51,6 +36,12 @@
|
||||||
|
|
||||||
-export([cmd/3]).
|
-export([cmd/3]).
|
||||||
|
|
||||||
|
%% redis host don't need parse
|
||||||
|
-define( REDIS_HOST_OPTIONS
|
||||||
|
, #{ host_type => hostname
|
||||||
|
, default_port => ?REDIS_DEFAULT_PORT}).
|
||||||
|
|
||||||
|
|
||||||
%%=====================================================================
|
%%=====================================================================
|
||||||
roots() ->
|
roots() ->
|
||||||
[ {config, #{type => hoconsc:union(
|
[ {config, #{type => hoconsc:union(
|
||||||
|
@ -62,21 +53,21 @@ roots() ->
|
||||||
].
|
].
|
||||||
|
|
||||||
fields(single) ->
|
fields(single) ->
|
||||||
[ {server, #{type => server()}}
|
[ {server, fun server/1}
|
||||||
, {redis_type, #{type => hoconsc:enum([single]),
|
, {redis_type, #{type => hoconsc:enum([single]),
|
||||||
default => single}}
|
default => single}}
|
||||||
] ++
|
] ++
|
||||||
redis_fields() ++
|
redis_fields() ++
|
||||||
emqx_connector_schema_lib:ssl_fields();
|
emqx_connector_schema_lib:ssl_fields();
|
||||||
fields(cluster) ->
|
fields(cluster) ->
|
||||||
[ {servers, #{type => servers()}}
|
[ {servers, fun servers/1}
|
||||||
, {redis_type, #{type => hoconsc:enum([cluster]),
|
, {redis_type, #{type => hoconsc:enum([cluster]),
|
||||||
default => cluster}}
|
default => cluster}}
|
||||||
] ++
|
] ++
|
||||||
redis_fields() ++
|
redis_fields() ++
|
||||||
emqx_connector_schema_lib:ssl_fields();
|
emqx_connector_schema_lib:ssl_fields();
|
||||||
fields(sentinel) ->
|
fields(sentinel) ->
|
||||||
[ {servers, #{type => servers()}}
|
[ {servers, fun servers/1}
|
||||||
, {redis_type, #{type => hoconsc:enum([sentinel]),
|
, {redis_type, #{type => hoconsc:enum([sentinel]),
|
||||||
default => sentinel}}
|
default => sentinel}}
|
||||||
, {sentinel, #{type => string()}}
|
, {sentinel, #{type => string()}}
|
||||||
|
@ -84,6 +75,20 @@ fields(sentinel) ->
|
||||||
redis_fields() ++
|
redis_fields() ++
|
||||||
emqx_connector_schema_lib:ssl_fields().
|
emqx_connector_schema_lib:ssl_fields().
|
||||||
|
|
||||||
|
server(type) -> emqx_schema:ip_port();
|
||||||
|
server(nullable) -> false;
|
||||||
|
server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
|
||||||
|
server(converter) -> fun to_server_raw/1;
|
||||||
|
server(desc) -> ?SERVER_DESC("Redis", integer_to_list(?REDIS_DEFAULT_PORT));
|
||||||
|
server(_) -> undefined.
|
||||||
|
|
||||||
|
servers(type) -> list();
|
||||||
|
servers(nullable) -> false;
|
||||||
|
servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
|
||||||
|
servers(converter) -> fun to_servers_raw/1;
|
||||||
|
servers(desc) -> ?SERVERS_DESC ++ server(desc);
|
||||||
|
servers(_) -> undefined.
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
on_start(InstId, #{redis_type := Type,
|
on_start(InstId, #{redis_type := Type,
|
||||||
database := Database,
|
database := Database,
|
||||||
|
@ -185,24 +190,22 @@ redis_fields() ->
|
||||||
, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
|
, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
|
||||||
].
|
].
|
||||||
|
|
||||||
to_server(Server) ->
|
-spec to_server_raw(string())
|
||||||
try {ok, parse_server(Server)}
|
-> {string(), pos_integer()}.
|
||||||
catch
|
to_server_raw(Server) ->
|
||||||
throw : Error ->
|
emqx_connector_schema_lib:parse_server(Server, ?REDIS_HOST_OPTIONS).
|
||||||
Error
|
|
||||||
end.
|
|
||||||
|
|
||||||
to_servers(Servers) ->
|
-spec to_servers_raw(string())
|
||||||
try {ok, lists:map(fun parse_server/1, string:tokens(Servers, ", "))}
|
-> [{string(), pos_integer()}].
|
||||||
catch
|
to_servers_raw(Servers) ->
|
||||||
throw : _Reason ->
|
lists:map( fun(Server) ->
|
||||||
{error, Servers}
|
emqx_connector_schema_lib:parse_server(Server, ?REDIS_HOST_OPTIONS)
|
||||||
end.
|
end
|
||||||
|
, string:tokens(str(Servers), ", ")).
|
||||||
|
|
||||||
parse_server(Server) ->
|
str(A) when is_atom(A) ->
|
||||||
case string:tokens(Server, ": ") of
|
atom_to_list(A);
|
||||||
[Host, Port] ->
|
str(B) when is_binary(B) ->
|
||||||
{Host, list_to_integer(Port)};
|
binary_to_list(B);
|
||||||
_ ->
|
str(S) when is_list(S) ->
|
||||||
throw({error, Server})
|
S.
|
||||||
end.
|
|
||||||
|
|
|
@ -1,3 +1,18 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_connector_schema).
|
-module(emqx_connector_schema).
|
||||||
|
|
||||||
-behaviour(hocon_schema).
|
-behaviour(hocon_schema).
|
||||||
|
|
|
@ -22,34 +22,27 @@
|
||||||
, ssl_fields/0
|
, ssl_fields/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ to_ip_port/1
|
-export([ ip_port_to_string/1
|
||||||
, ip_port_to_string/1
|
, parse_server/2
|
||||||
, to_servers/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ pool_size/1
|
-export([ pool_size/1
|
||||||
, database/1
|
, database/1
|
||||||
, username/1
|
, username/1
|
||||||
, password/1
|
, password/1
|
||||||
, servers/1
|
|
||||||
, auto_reconnect/1
|
, auto_reconnect/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-typerefl_from_string({ip_port/0, emqx_connector_schema_lib, to_ip_port}).
|
|
||||||
-typerefl_from_string({servers/0, emqx_connector_schema_lib, to_servers}).
|
|
||||||
|
|
||||||
-type database() :: binary().
|
-type database() :: binary().
|
||||||
-type pool_size() :: integer().
|
-type pool_size() :: integer().
|
||||||
-type username() :: binary().
|
-type username() :: binary().
|
||||||
-type password() :: binary().
|
-type password() :: binary().
|
||||||
-type servers() :: list().
|
|
||||||
|
|
||||||
-reflect_type([ database/0
|
-reflect_type([ database/0
|
||||||
, pool_size/0
|
, pool_size/0
|
||||||
, username/0
|
, username/0
|
||||||
, password/0
|
, password/0
|
||||||
, servers/0
|
]).
|
||||||
]).
|
|
||||||
|
|
||||||
-export([roots/0, fields/1]).
|
-export([roots/0, fields/1]).
|
||||||
|
|
||||||
|
@ -65,19 +58,13 @@ ssl_fields() ->
|
||||||
].
|
].
|
||||||
|
|
||||||
relational_db_fields() ->
|
relational_db_fields() ->
|
||||||
[ {server, fun server/1}
|
[ {database, fun database/1}
|
||||||
, {database, fun database/1}
|
|
||||||
, {pool_size, fun pool_size/1}
|
, {pool_size, fun pool_size/1}
|
||||||
, {username, fun username/1}
|
, {username, fun username/1}
|
||||||
, {password, fun password/1}
|
, {password, fun password/1}
|
||||||
, {auto_reconnect, fun auto_reconnect/1}
|
, {auto_reconnect, fun auto_reconnect/1}
|
||||||
].
|
].
|
||||||
|
|
||||||
server(type) -> emqx_schema:ip_port();
|
|
||||||
server(nullable) -> false;
|
|
||||||
server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
|
|
||||||
server(_) -> undefined.
|
|
||||||
|
|
||||||
database(type) -> binary();
|
database(type) -> binary();
|
||||||
database(nullable) -> false;
|
database(nullable) -> false;
|
||||||
database(validator) -> [?NOT_EMPTY("the value of the field 'database' cannot be empty")];
|
database(validator) -> [?NOT_EMPTY("the value of the field 'database' cannot be empty")];
|
||||||
|
@ -100,31 +87,59 @@ auto_reconnect(type) -> boolean();
|
||||||
auto_reconnect(default) -> true;
|
auto_reconnect(default) -> true;
|
||||||
auto_reconnect(_) -> undefined.
|
auto_reconnect(_) -> undefined.
|
||||||
|
|
||||||
servers(type) -> servers();
|
|
||||||
servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
|
|
||||||
servers(_) -> undefined.
|
|
||||||
|
|
||||||
to_ip_port(Str) ->
|
|
||||||
case string:tokens(Str, ": ") of
|
|
||||||
[Ip, Port] ->
|
|
||||||
case inet:parse_address(Ip) of
|
|
||||||
{ok, R} -> {ok, {R, list_to_integer(Port)}};
|
|
||||||
_ -> {error, Str}
|
|
||||||
end;
|
|
||||||
_ -> {error, Str}
|
|
||||||
end.
|
|
||||||
|
|
||||||
ip_port_to_string({Ip, Port}) when is_list(Ip) ->
|
ip_port_to_string({Ip, Port}) when is_list(Ip) ->
|
||||||
iolist_to_binary([Ip, ":", integer_to_list(Port)]);
|
iolist_to_binary([Ip, ":", integer_to_list(Port)]);
|
||||||
ip_port_to_string({Ip, Port}) when is_tuple(Ip) ->
|
ip_port_to_string({Ip, Port}) when is_tuple(Ip) ->
|
||||||
iolist_to_binary([inet:ntoa(Ip), ":", integer_to_list(Port)]).
|
iolist_to_binary([inet:ntoa(Ip), ":", integer_to_list(Port)]).
|
||||||
|
|
||||||
to_servers(Str) ->
|
parse_server(Str, #{host_type := inet_addr, default_port := DefaultPort}) ->
|
||||||
{ok, lists:map(fun(Server) ->
|
try string:tokens(str(Str), ": ") of
|
||||||
case string:tokens(Server, ": ") of
|
[Ip, Port] ->
|
||||||
[Ip] ->
|
case parse_ip(Ip) of
|
||||||
[{host, Ip}];
|
{ok, R} -> {R, list_to_integer(Port)}
|
||||||
[Ip, Port] ->
|
end;
|
||||||
[{host, Ip}, {port, list_to_integer(Port)}]
|
[Ip] ->
|
||||||
end
|
case parse_ip(Ip) of
|
||||||
end, string:tokens(Str, " , "))}.
|
{ok, R} -> {R, DefaultPort}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
?THROW_ERROR("Bad server schema.")
|
||||||
|
catch
|
||||||
|
error : Reason ->
|
||||||
|
?THROW_ERROR(Reason)
|
||||||
|
end;
|
||||||
|
parse_server(Str, #{host_type := hostname, default_port := DefaultPort}) ->
|
||||||
|
try string:tokens(str(Str), ": ") of
|
||||||
|
[Ip, Port] ->
|
||||||
|
{Ip, list_to_integer(Port)};
|
||||||
|
[Ip] ->
|
||||||
|
{Ip, DefaultPort};
|
||||||
|
_ ->
|
||||||
|
?THROW_ERROR("Bad server schema.")
|
||||||
|
catch
|
||||||
|
error : Reason ->
|
||||||
|
?THROW_ERROR(Reason)
|
||||||
|
end;
|
||||||
|
parse_server(_, _) ->
|
||||||
|
?THROW_ERROR("Invalid Host").
|
||||||
|
|
||||||
|
parse_ip(Str) ->
|
||||||
|
case inet:parse_address(Str) of
|
||||||
|
{ok, R} ->
|
||||||
|
{ok, R};
|
||||||
|
_ ->
|
||||||
|
%% check is a rfc1035's hostname
|
||||||
|
case inet_parse:domain(Str) of
|
||||||
|
true ->
|
||||||
|
{ok, Str};
|
||||||
|
_ ->
|
||||||
|
?THROW_ERROR("Bad IP or Host")
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
str(A) when is_atom(A) ->
|
||||||
|
atom_to_list(A);
|
||||||
|
str(B) when is_binary(B) ->
|
||||||
|
binary_to_list(B);
|
||||||
|
str(S) when is_list(S) ->
|
||||||
|
S.
|
||||||
|
|
|
@ -18,12 +18,12 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
|
||||||
-define(MONGO_HOST, "mongo").
|
-define(MONGO_HOST, "mongo").
|
||||||
-define(MONGO_PORT, 27017).
|
|
||||||
-define(MONGO_CLIENT, 'emqx_connector_mongo_SUITE_client').
|
-define(MONGO_CLIENT, 'emqx_connector_mongo_SUITE_client').
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
|
@ -33,7 +33,7 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_connector_test_helpers:start_apps([ecpool, mongodb]),
|
ok = emqx_connector_test_helpers:start_apps([ecpool, mongodb]),
|
||||||
Config;
|
Config;
|
||||||
|
@ -129,5 +129,5 @@ mongo_config() ->
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
ssl => #{enable => false},
|
ssl => #{enable => false},
|
||||||
srv_record => false,
|
srv_record => false,
|
||||||
servers => <<"127.0.0.1:27017">>
|
server => {<<?MONGO_HOST>>, ?MONGO_DEFAULT_PORT}
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -18,12 +18,12 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
|
||||||
-define(MYSQL_HOST, "mysql").
|
-define(MYSQL_HOST, "mysql").
|
||||||
-define(MYSQL_PORT, 3306).
|
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
@ -32,15 +32,16 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
|
ok = emqx_connector_test_helpers:start_apps([ecpool, mysql]),
|
||||||
Config;
|
Config;
|
||||||
false ->
|
false ->
|
||||||
{skip, no_mysql}
|
{skip, no_mysql}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok.
|
ok = emqx_connector_test_helpers:stop_apps([ecpool, mysql]).
|
||||||
|
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
|
@ -124,7 +125,7 @@ mysql_config() ->
|
||||||
username => <<"root">>,
|
username => <<"root">>,
|
||||||
password => <<"public">>,
|
password => <<"public">>,
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
server => {?MYSQL_HOST, ?MYSQL_PORT},
|
server => {?MYSQL_HOST, ?MYSQL_DEFAULT_PORT},
|
||||||
ssl => #{enable => false}
|
ssl => #{enable => false}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -18,12 +18,12 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("emqx_connector.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
|
||||||
-define(PGSQL_HOST, "pgsql").
|
-define(PGSQL_HOST, "pgsql").
|
||||||
-define(PGSQL_PORT, 5432).
|
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
@ -32,15 +32,16 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
|
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of
|
||||||
true ->
|
true ->
|
||||||
|
ok = emqx_connector_test_helpers:start_apps([ecpool, pgsql]),
|
||||||
Config;
|
Config;
|
||||||
false ->
|
false ->
|
||||||
{skip, no_pgsql}
|
{skip, no_pgsql}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok.
|
ok = emqx_connector_test_helpers:stop_apps([ecpool, pgsql]).
|
||||||
|
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
|
@ -124,7 +125,7 @@ pgsql_config() ->
|
||||||
username => <<"root">>,
|
username => <<"root">>,
|
||||||
password => <<"public">>,
|
password => <<"public">>,
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
server => {?PGSQL_HOST, ?PGSQL_PORT},
|
server => {?PGSQL_HOST, ?PGSQL_DEFAULT_PORT},
|
||||||
ssl => #{enable => false}
|
ssl => #{enable => false}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
@ -135,7 +136,7 @@ pgsql_bad_config() ->
|
||||||
username => <<"bad_root">>,
|
username => <<"bad_root">>,
|
||||||
password => <<"bad_public">>,
|
password => <<"bad_public">>,
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
server => {?PGSQL_HOST, ?PGSQL_PORT},
|
server => {?PGSQL_HOST, ?PGSQL_DEFAULT_PORT},
|
||||||
ssl => #{enable => false}
|
ssl => #{enable => false}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -445,8 +445,6 @@ typename_to_spec("file()", _Mod) -> #{type => string, example => <<"/path/to/fil
|
||||||
typename_to_spec("ip_port()", _Mod) -> #{type => string, example => <<"127.0.0.1:80">>};
|
typename_to_spec("ip_port()", _Mod) -> #{type => string, example => <<"127.0.0.1:80">>};
|
||||||
typename_to_spec("ip_ports()", _Mod) -> #{type => string, example => <<"127.0.0.1:80, 127.0.0.2:80">>};
|
typename_to_spec("ip_ports()", _Mod) -> #{type => string, example => <<"127.0.0.1:80, 127.0.0.2:80">>};
|
||||||
typename_to_spec("url()", _Mod) -> #{type => string, example => <<"http://127.0.0.1">>};
|
typename_to_spec("url()", _Mod) -> #{type => string, example => <<"http://127.0.0.1">>};
|
||||||
typename_to_spec("server()", Mod) -> typename_to_spec("ip_port()", Mod);
|
|
||||||
typename_to_spec("servers()", Mod) -> typename_to_spec("ip_ports()", Mod);
|
|
||||||
typename_to_spec("connect_timeout()", Mod) -> typename_to_spec("timeout()", Mod);
|
typename_to_spec("connect_timeout()", Mod) -> typename_to_spec("timeout()", Mod);
|
||||||
typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, example => infinity},
|
typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, example => infinity},
|
||||||
#{type => integer, example => 100}], example => infinity};
|
#{type => integer, example => 100}], example => infinity};
|
||||||
|
|
|
@ -361,7 +361,7 @@ schema("/ref/complicated_type") ->
|
||||||
200 => [
|
200 => [
|
||||||
{no_neg_integer, hoconsc:mk(non_neg_integer(), #{})},
|
{no_neg_integer, hoconsc:mk(non_neg_integer(), #{})},
|
||||||
{url, hoconsc:mk(emqx_connector_http:url(), #{})},
|
{url, hoconsc:mk(emqx_connector_http:url(), #{})},
|
||||||
{server, hoconsc:mk(emqx_connector_redis:server(), #{})},
|
{server, hoconsc:mk(emqx_schema:ip_port(), #{})},
|
||||||
{connect_timeout, hoconsc:mk(emqx_connector_http:connect_timeout(), #{})},
|
{connect_timeout, hoconsc:mk(emqx_connector_http:connect_timeout(), #{})},
|
||||||
{pool_type, hoconsc:mk(emqx_connector_http:pool_type(), #{})},
|
{pool_type, hoconsc:mk(emqx_connector_http:pool_type(), #{})},
|
||||||
{timeout, hoconsc:mk(timeout(), #{})},
|
{timeout, hoconsc:mk(timeout(), #{})},
|
||||||
|
|
Loading…
Reference in New Issue