chore(connector): update connector ssl schema
This commit is contained in:
parent
c9acf423ba
commit
2b082f9cf9
|
@ -11,9 +11,11 @@ services:
|
||||||
MYSQL_USER: ssluser
|
MYSQL_USER: ssluser
|
||||||
MYSQL_PASSWORD: public
|
MYSQL_PASSWORD: public
|
||||||
volumes:
|
volumes:
|
||||||
- ../../apps/emqx_auth_mysql/test/emqx_auth_mysql_SUITE_data/ca.pem:/etc/certs/ca-cert.pem
|
- ../../apps/emqx/etc/certs/cacert.pem:/etc/certs/ca-cert.pem
|
||||||
- ../../apps/emqx_auth_mysql/test/emqx_auth_mysql_SUITE_data/server-cert.pem:/etc/certs/server-cert.pem
|
- ../../apps/emqx/etc/certs/cert.pem:/etc/certs/server-cert.pem
|
||||||
- ../../apps/emqx_auth_mysql/test/emqx_auth_mysql_SUITE_data/server-key.pem:/etc/certs/server-key.pem
|
- ../../apps/emqx/etc/certs/key.pem:/etc/certs/server-key.pem
|
||||||
|
ports:
|
||||||
|
- "3306:3306"
|
||||||
networks:
|
networks:
|
||||||
- emqx_bridge
|
- emqx_bridge
|
||||||
command:
|
command:
|
||||||
|
|
|
@ -5,7 +5,9 @@ services:
|
||||||
container_name: redis
|
container_name: redis
|
||||||
image: redis:${REDIS_TAG}
|
image: redis:${REDIS_TAG}
|
||||||
volumes:
|
volumes:
|
||||||
- ../../apps/emqx_auth_redis/test/emqx_auth_redis_SUITE_data/certs:/tls
|
- ../../apps/emqx/etc/certs/cacert.pem:/etc/certs/ca.crt
|
||||||
|
- ../../apps/emqx/etc/certs/cert.pem:/etc/certs/redis.crt
|
||||||
|
- ../../apps/emqx/etc/certs/key.pem:/etc/certs/redis.key
|
||||||
- ./redis/:/data/conf
|
- ./redis/:/data/conf
|
||||||
command: bash -c "/bin/bash /data/conf/redis.sh --node cluster --tls-enabled && tail -f /var/log/redis-server.log"
|
command: bash -c "/bin/bash /data/conf/redis.sh --node cluster --tls-enabled && tail -f /var/log/redis-server.log"
|
||||||
networks:
|
networks:
|
||||||
|
|
|
@ -5,7 +5,9 @@ services:
|
||||||
container_name: redis
|
container_name: redis
|
||||||
image: redis:${REDIS_TAG}
|
image: redis:${REDIS_TAG}
|
||||||
volumes:
|
volumes:
|
||||||
- ../../apps/emqx_auth_redis/test/emqx_auth_redis_SUITE_data/certs:/tls
|
- ../../apps/emqx/etc/certs/cacert.pem:/etc/certs/ca.crt
|
||||||
|
- ../../apps/emqx/etc/certs/cert.pem:/etc/certs/redis.crt
|
||||||
|
- ../../apps/emqx/etc/certs/key.pem:/etc/certs/redis.key
|
||||||
- ./redis/:/data/conf
|
- ./redis/:/data/conf
|
||||||
command: bash -c "/bin/bash /data/conf/redis.sh --node sentinel --tls-enabled && tail -f /var/log/redis-server.log"
|
command: bash -c "/bin/bash /data/conf/redis.sh --node sentinel --tls-enabled && tail -f /var/log/redis-server.log"
|
||||||
networks:
|
networks:
|
||||||
|
|
|
@ -5,15 +5,17 @@ services:
|
||||||
container_name: redis
|
container_name: redis
|
||||||
image: redis:${REDIS_TAG}
|
image: redis:${REDIS_TAG}
|
||||||
volumes:
|
volumes:
|
||||||
- ../../apps/emqx_auth_redis/test/emqx_auth_redis_SUITE_data/certs:/tls
|
- ../../apps/emqx/etc/certs/cacert.pem:/etc/certs/ca.crt
|
||||||
|
- ../../apps/emqx/etc/certs/cert.pem:/etc/certs/redis.crt
|
||||||
|
- ../../apps/emqx/etc/certs/key.pem:/etc/certs/redis.key
|
||||||
command:
|
command:
|
||||||
- redis-server
|
- redis-server
|
||||||
- "--bind 0.0.0.0 ::"
|
- "--bind 0.0.0.0 ::"
|
||||||
- --requirepass public
|
- --requirepass public
|
||||||
- --tls-port 6380
|
- --tls-port 6380
|
||||||
- --tls-cert-file /tls/redis.crt
|
- --tls-cert-file /etc/certs/redis.crt
|
||||||
- --tls-key-file /tls/redis.key
|
- --tls-key-file /etc/certs/redis.key
|
||||||
- --tls-ca-cert-file /tls/ca.crt
|
- --tls-ca-cert-file /etc/certs/ca.crt
|
||||||
restart: always
|
restart: always
|
||||||
networks:
|
networks:
|
||||||
- emqx_bridge
|
- emqx_bridge
|
||||||
|
|
|
@ -2,9 +2,9 @@ ARG BUILD_FROM=postgres:11
|
||||||
FROM ${BUILD_FROM}
|
FROM ${BUILD_FROM}
|
||||||
ARG POSTGRES_USER=postgres
|
ARG POSTGRES_USER=postgres
|
||||||
COPY --chown=$POSTGRES_USER .ci/docker-compose-file/pgsql/pg_hba.conf /var/lib/postgresql/pg_hba.conf
|
COPY --chown=$POSTGRES_USER .ci/docker-compose-file/pgsql/pg_hba.conf /var/lib/postgresql/pg_hba.conf
|
||||||
COPY --chown=$POSTGRES_USER apps/emqx_auth_pgsql/test/emqx_auth_pgsql_SUITE_data/server-key.pem /var/lib/postgresql/server.key
|
COPY --chown=$POSTGRES_USER apps/emqx/etc/certs/key.pem /var/lib/postgresql/server.key
|
||||||
COPY --chown=$POSTGRES_USER apps/emqx_auth_pgsql/test/emqx_auth_pgsql_SUITE_data/server-cert.pem /var/lib/postgresql/server.crt
|
COPY --chown=$POSTGRES_USER apps/emqx/etc/certs/cert.pem /var/lib/postgresql/server.crt
|
||||||
COPY --chown=$POSTGRES_USER apps/emqx_auth_pgsql/test/emqx_auth_pgsql_SUITE_data/ca.pem /var/lib/postgresql/root.crt
|
COPY --chown=$POSTGRES_USER apps/emqx/etc/certs/cacert.pem /var/lib/postgresql/root.crt
|
||||||
RUN chmod 600 /var/lib/postgresql/pg_hba.conf
|
RUN chmod 600 /var/lib/postgresql/pg_hba.conf
|
||||||
RUN chmod 600 /var/lib/postgresql/server.key
|
RUN chmod 600 /var/lib/postgresql/server.key
|
||||||
RUN chmod 600 /var/lib/postgresql/server.crt
|
RUN chmod 600 /var/lib/postgresql/server.crt
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
daemonize yes
|
daemonize yes
|
||||||
bind 0.0.0.0 ::
|
bind 0.0.0.0 ::
|
||||||
logfile /var/log/redis-server.log
|
logfile /var/log/redis-server.log
|
||||||
tls-cert-file /tls/redis.crt
|
tls-cert-file /etc/certs/redis.crt
|
||||||
tls-key-file /tls/redis.key
|
tls-key-file /etc/certs/redis.key
|
||||||
tls-ca-cert-file /tls/ca.crt
|
tls-ca-cert-file /etc/certs/ca.crt
|
||||||
tls-replication yes
|
tls-replication yes
|
||||||
tls-cluster yes
|
tls-cluster yes
|
||||||
protected-mode no
|
protected-mode no
|
||||||
|
|
|
@ -91,7 +91,7 @@ do
|
||||||
fi
|
fi
|
||||||
if [ "${node}" = "cluster" ] ; then
|
if [ "${node}" = "cluster" ] ; then
|
||||||
if $tls ; then
|
if $tls ; then
|
||||||
yes "yes" | redis-cli --cluster create "$LOCAL_IP:8000" "$LOCAL_IP:8001" "$LOCAL_IP:8002" --pass public --no-auth-warning --tls true --cacert /tls/ca.crt --cert /tls/redis.crt --key /tls/redis.key;
|
yes "yes" | redis-cli --cluster create "$LOCAL_IP:8000" "$LOCAL_IP:8001" "$LOCAL_IP:8002" --pass public --no-auth-warning --tls true --cacert /etc/certs/ca.crt --cert /etc/certs/redis.crt --key /etc/certs/redis.key;
|
||||||
else
|
else
|
||||||
yes "yes" | redis-cli --cluster create "$LOCAL_IP:7000" "$LOCAL_IP:7001" "$LOCAL_IP:7002" --pass public --no-auth-warning;
|
yes "yes" | redis-cli --cluster create "$LOCAL_IP:7000" "$LOCAL_IP:7001" "$LOCAL_IP:7002" --pass public --no-auth-warning;
|
||||||
fi
|
fi
|
||||||
|
@ -107,9 +107,9 @@ EOF
|
||||||
cat >>/_sentinel.conf<<EOF
|
cat >>/_sentinel.conf<<EOF
|
||||||
tls-port 26380
|
tls-port 26380
|
||||||
tls-replication yes
|
tls-replication yes
|
||||||
tls-cert-file /tls/redis.crt
|
tls-cert-file /etc/certs/redis.crt
|
||||||
tls-key-file /tls/redis.key
|
tls-key-file /etc/certs/redis.key
|
||||||
tls-ca-cert-file /tls/ca.crt
|
tls-ca-cert-file /etc/certs/ca.crt
|
||||||
sentinel monitor mymaster $LOCAL_IP 8000 1
|
sentinel monitor mymaster $LOCAL_IP 8000 1
|
||||||
EOF
|
EOF
|
||||||
else
|
else
|
||||||
|
|
|
@ -16,7 +16,12 @@ authz:{
|
||||||
username: root
|
username: root
|
||||||
password: public
|
password: public
|
||||||
auto_reconnect: true
|
auto_reconnect: true
|
||||||
ssl: false
|
ssl: {
|
||||||
|
enable: true
|
||||||
|
cacertfile: "etc/certs/cacert.pem"
|
||||||
|
certfile: "etc/certs/client-cert.pem"
|
||||||
|
keyfile: "etc/certs/client-key.pem"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or clientid = '%c'"
|
sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or clientid = '%c'"
|
||||||
},
|
},
|
||||||
|
@ -29,7 +34,7 @@ authz:{
|
||||||
username: root
|
username: root
|
||||||
password: public
|
password: public
|
||||||
auto_reconnect: true
|
auto_reconnect: true
|
||||||
ssl: false
|
ssl: {enable: false}
|
||||||
}
|
}
|
||||||
sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
|
sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
|
||||||
},
|
},
|
||||||
|
@ -41,7 +46,7 @@ authz:{
|
||||||
pool_size: 1
|
pool_size: 1
|
||||||
password: public
|
password: public
|
||||||
auto_reconnect: true
|
auto_reconnect: true
|
||||||
ssl: false
|
ssl: {enable: false}
|
||||||
}
|
}
|
||||||
cmd: "HGETALL mqtt_acl:%u"
|
cmd: "HGETALL mqtt_acl:%u"
|
||||||
},
|
},
|
||||||
|
|
|
@ -9,7 +9,12 @@ authz:{
|
||||||
# username: root
|
# username: root
|
||||||
# password: public
|
# password: public
|
||||||
# auto_reconnect: true
|
# auto_reconnect: true
|
||||||
# ssl: false
|
# ssl: {
|
||||||
|
# enable: true
|
||||||
|
# cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem"
|
||||||
|
# certfile: "{{ platform_etc_dir }}/certs/client-cert.pem"
|
||||||
|
# keyfile: "{{ platform_etc_dir }}/certs/client-key.pem"
|
||||||
|
# }
|
||||||
# }
|
# }
|
||||||
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or clientid = '%c'"
|
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or clientid = '%c'"
|
||||||
# },
|
# },
|
||||||
|
@ -22,7 +27,7 @@ authz:{
|
||||||
# username: root
|
# username: root
|
||||||
# password: public
|
# password: public
|
||||||
# auto_reconnect: true
|
# auto_reconnect: true
|
||||||
# ssl: false
|
# ssl: {enable: false}
|
||||||
# }
|
# }
|
||||||
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
|
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
|
||||||
# },
|
# },
|
||||||
|
@ -34,7 +39,7 @@ authz:{
|
||||||
# pool_size: 1
|
# pool_size: 1
|
||||||
# password: public
|
# password: public
|
||||||
# auto_reconnect: true
|
# auto_reconnect: true
|
||||||
# ssl: false
|
# ssl: {enable: false}
|
||||||
# }
|
# }
|
||||||
# cmd: "HGETALL mqtt_acl:%u"
|
# cmd: "HGETALL mqtt_acl:%u"
|
||||||
# },
|
# },
|
||||||
|
|
|
@ -67,10 +67,14 @@ create_resource(#{<<"type">> := DB,
|
||||||
<<"config">> := Config
|
<<"config">> := Config
|
||||||
} = Rule) ->
|
} = Rule) ->
|
||||||
ResourceID = iolist_to_binary([io_lib:format("~s_~s",[?APP, DB]), "_", integer_to_list(erlang:system_time())]),
|
ResourceID = iolist_to_binary([io_lib:format("~s_~s",[?APP, DB]), "_", integer_to_list(erlang:system_time())]),
|
||||||
|
NConfig = case DB of
|
||||||
|
redis -> #{<<"config">> => Config };
|
||||||
|
_ -> Config
|
||||||
|
end,
|
||||||
case emqx_resource:check_and_create(
|
case emqx_resource:check_and_create(
|
||||||
ResourceID,
|
ResourceID,
|
||||||
list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
|
list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
|
||||||
#{<<"config">> => Config })
|
NConfig)
|
||||||
of
|
of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
Rule#{<<"resource_id">> => ResourceID};
|
Rule#{<<"resource_id">> => ResourceID};
|
||||||
|
|
|
@ -55,7 +55,8 @@ set_special_configs(emqx_authz) ->
|
||||||
<<"server">> => <<"127.0.0.1:6379">>,
|
<<"server">> => <<"127.0.0.1:6379">>,
|
||||||
<<"password">> => <<"public">>,
|
<<"password">> => <<"public">>,
|
||||||
<<"pool_size">> => 1,
|
<<"pool_size">> => 1,
|
||||||
<<"auto_reconnect">> => true
|
<<"auto_reconnect">> => true,
|
||||||
|
<<"ssl">> => #{<<"enable">> => false}
|
||||||
},
|
},
|
||||||
<<"principal">> => all,
|
<<"principal">> => all,
|
||||||
<<"cmd">> => <<"fake cmd">>,
|
<<"cmd">> => <<"fake cmd">>,
|
||||||
|
|
|
@ -6,9 +6,13 @@
|
||||||
{applications,
|
{applications,
|
||||||
[kernel,
|
[kernel,
|
||||||
stdlib,
|
stdlib,
|
||||||
|
ecpool,
|
||||||
emqx_resource,
|
emqx_resource,
|
||||||
eredis_cluster,
|
eredis_cluster,
|
||||||
ecpool
|
eredis,
|
||||||
|
epgsql,
|
||||||
|
mysql,
|
||||||
|
mongodb
|
||||||
]},
|
]},
|
||||||
{env,[]},
|
{env,[]},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
|
|
|
@ -38,7 +38,7 @@
|
||||||
structs() -> [""].
|
structs() -> [""].
|
||||||
|
|
||||||
fields("") ->
|
fields("") ->
|
||||||
redis_fields() ++
|
ldap_fields() ++
|
||||||
emqx_connector_schema_lib:ssl_fields().
|
emqx_connector_schema_lib:ssl_fields().
|
||||||
|
|
||||||
on_jsonify(Config) ->
|
on_jsonify(Config) ->
|
||||||
|
@ -51,10 +51,17 @@ on_start(InstId, #{servers := Servers0,
|
||||||
bind_password := BindPassword,
|
bind_password := BindPassword,
|
||||||
timeout := Timeout,
|
timeout := Timeout,
|
||||||
pool_size := PoolSize,
|
pool_size := PoolSize,
|
||||||
auto_reconnect := AutoReconn} = Config) ->
|
auto_reconnect := AutoReconn,
|
||||||
logger:info("starting redis connector: ~p, config: ~p", [InstId, Config]),
|
ssl := SSL} = Config) ->
|
||||||
|
logger:info("starting ldap connector: ~p, config: ~p", [InstId, Config]),
|
||||||
Servers = [begin proplists:get_value(host, S) end || S <- Servers0],
|
Servers = [begin proplists:get_value(host, S) end || S <- Servers0],
|
||||||
SslOpts = init_ssl_opts(Config, InstId),
|
SslOpts = case maps:get(enable, SSL) of
|
||||||
|
true ->
|
||||||
|
[{ssl, true},
|
||||||
|
{sslopts, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)}
|
||||||
|
];
|
||||||
|
false -> [{ssl, false}]
|
||||||
|
end,
|
||||||
Opts = [{servers, Servers},
|
Opts = [{servers, Servers},
|
||||||
{port, Port},
|
{port, Port},
|
||||||
{bind_dn, BindDn},
|
{bind_dn, BindDn},
|
||||||
|
@ -68,14 +75,14 @@ on_start(InstId, #{servers := Servers0,
|
||||||
{ok, #{poolname => PoolName}}.
|
{ok, #{poolname => PoolName}}.
|
||||||
|
|
||||||
on_stop(InstId, #{poolname := PoolName}) ->
|
on_stop(InstId, #{poolname := PoolName}) ->
|
||||||
logger:info("stopping redis connector: ~p", [InstId]),
|
logger:info("stopping ldap connector: ~p", [InstId]),
|
||||||
emqx_plugin_libs_pool:stop_pool(PoolName).
|
emqx_plugin_libs_pool:stop_pool(PoolName).
|
||||||
|
|
||||||
on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) ->
|
on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) ->
|
||||||
logger:debug("redis connector ~p received request: ~p, at state: ~p", [InstId, {Base, Filter, Attributes}, State]),
|
logger:debug("ldap connector ~p received request: ~p, at state: ~p", [InstId, {Base, Filter, Attributes}, State]),
|
||||||
case Result = ecpool:pick_and_do(PoolName, {?MODULE, search, [Base, Filter, Attributes]}, no_handover) of
|
case Result = ecpool:pick_and_do(PoolName, {?MODULE, search, [Base, Filter, Attributes]}, no_handover) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
logger:debug("redis connector ~p do request failed, request: ~p, reason: ~p", [InstId, {Base, Filter, Attributes}, Reason]),
|
logger:debug("ldap connector ~p do request failed, request: ~p, reason: ~p", [InstId, {Base, Filter, Attributes}, Reason]),
|
||||||
emqx_resource:query_failed(AfterQuery);
|
emqx_resource:query_failed(AfterQuery);
|
||||||
_ ->
|
_ ->
|
||||||
emqx_resource:query_success(AfterQuery)
|
emqx_resource:query_success(AfterQuery)
|
||||||
|
@ -116,14 +123,7 @@ connect(Opts) ->
|
||||||
ok = eldap2:simple_bind(LDAP, BindDn, BindPassword),
|
ok = eldap2:simple_bind(LDAP, BindDn, BindPassword),
|
||||||
{ok, LDAP}.
|
{ok, LDAP}.
|
||||||
|
|
||||||
init_ssl_opts(#{ssl := true} = Config, InstId) ->
|
ldap_fields() ->
|
||||||
[{ssl, true},
|
|
||||||
{sslopts, emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)}
|
|
||||||
];
|
|
||||||
init_ssl_opts(_Config, _InstId) ->
|
|
||||||
[{ssl, false}].
|
|
||||||
|
|
||||||
redis_fields() ->
|
|
||||||
[ {servers, fun emqx_connector_schema_lib:servers/1}
|
[ {servers, fun emqx_connector_schema_lib:servers/1}
|
||||||
, {port, fun port/1}
|
, {port, fun port/1}
|
||||||
, {pool_size, fun emqx_connector_schema_lib:pool_size/1}
|
, {pool_size, fun emqx_connector_schema_lib:pool_size/1}
|
||||||
|
|
|
@ -48,9 +48,16 @@ on_jsonify(Config) ->
|
||||||
on_start(InstId, #{servers := Servers,
|
on_start(InstId, #{servers := Servers,
|
||||||
mongo_type := Type,
|
mongo_type := Type,
|
||||||
database := Database,
|
database := Database,
|
||||||
pool_size := PoolSize} = Config) ->
|
pool_size := PoolSize,
|
||||||
|
ssl := SSL} = Config) ->
|
||||||
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
|
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
|
||||||
SslOpts = init_ssl_opts(Config, InstId),
|
SslOpts = case maps:get(enable, SSL) of
|
||||||
|
true ->
|
||||||
|
[{ssl, true},
|
||||||
|
{ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)}
|
||||||
|
];
|
||||||
|
false -> [{ssl, false}]
|
||||||
|
end,
|
||||||
Hosts = [string:trim(H) || H <- string:tokens(binary_to_list(Servers), ",")],
|
Hosts = [string:trim(H) || H <- string:tokens(binary_to_list(Servers), ",")],
|
||||||
Opts = [{type, init_type(Type, Config)},
|
Opts = [{type, init_type(Type, Config)},
|
||||||
{hosts, Hosts},
|
{hosts, Hosts},
|
||||||
|
@ -157,13 +164,6 @@ init_worker_options([_ | R], Acc) ->
|
||||||
init_worker_options(R, Acc);
|
init_worker_options(R, Acc);
|
||||||
init_worker_options([], Acc) -> Acc.
|
init_worker_options([], Acc) -> Acc.
|
||||||
|
|
||||||
init_ssl_opts(#{ssl := true} = Config, InstId) ->
|
|
||||||
[{ssl, true},
|
|
||||||
{ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)}
|
|
||||||
];
|
|
||||||
init_ssl_opts(_Config, _InstId) ->
|
|
||||||
[{ssl, false}].
|
|
||||||
|
|
||||||
host_port(HostPort) ->
|
host_port(HostPort) ->
|
||||||
case string:split(HostPort, ":") of
|
case string:split(HostPort, ":") of
|
||||||
[Host, Port] ->
|
[Host, Port] ->
|
||||||
|
|
|
@ -51,14 +51,14 @@ on_start(InstId, #{server := {Host, Port},
|
||||||
username := User,
|
username := User,
|
||||||
password := Password,
|
password := Password,
|
||||||
auto_reconnect := AutoReconn,
|
auto_reconnect := AutoReconn,
|
||||||
pool_size := PoolSize} = Config) ->
|
pool_size := PoolSize,
|
||||||
|
ssl := SSL } = Config) ->
|
||||||
logger:info("starting mysql connector: ~p, config: ~p", [InstId, Config]),
|
logger:info("starting mysql connector: ~p, config: ~p", [InstId, Config]),
|
||||||
SslOpts = case maps:get(ssl, Config) of
|
SslOpts = case maps:get(enable, SSL) of
|
||||||
true ->
|
true ->
|
||||||
[{ssl, [{server_name_indication, disable} |
|
[{ssl, [{server_name_indication, disable} |
|
||||||
emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)]}];
|
emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)]}];
|
||||||
false ->
|
false -> []
|
||||||
[]
|
|
||||||
end,
|
end,
|
||||||
Options = [{host, Host},
|
Options = [{host, Host},
|
||||||
{port, Port},
|
{port, Port},
|
||||||
|
|
|
@ -50,14 +50,14 @@ on_start(InstId, #{server := {Host, Port},
|
||||||
username := User,
|
username := User,
|
||||||
password := Password,
|
password := Password,
|
||||||
auto_reconnect := AutoReconn,
|
auto_reconnect := AutoReconn,
|
||||||
pool_size := PoolSize} = Config) ->
|
pool_size := PoolSize,
|
||||||
|
ssl := SSL } = Config) ->
|
||||||
logger:info("starting postgresql connector: ~p, config: ~p", [InstId, Config]),
|
logger:info("starting postgresql connector: ~p, config: ~p", [InstId, Config]),
|
||||||
SslOpts = case maps:get(ssl, Config) of
|
SslOpts = case maps:get(enable, SSL) of
|
||||||
true ->
|
true ->
|
||||||
[{ssl_opts, [{server_name_indication, disable} |
|
[{ssl, [{server_name_indication, disable} |
|
||||||
emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)]}];
|
emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)]}];
|
||||||
false ->
|
false -> []
|
||||||
[]
|
|
||||||
end,
|
end,
|
||||||
Options = [{host, Host},
|
Options = [{host, Host},
|
||||||
{port, Port},
|
{port, Port},
|
||||||
|
|
|
@ -81,7 +81,8 @@ on_jsonify(Config) ->
|
||||||
on_start(InstId, #{config :=#{redis_type := Type,
|
on_start(InstId, #{config :=#{redis_type := Type,
|
||||||
database := Database,
|
database := Database,
|
||||||
pool_size := PoolSize,
|
pool_size := PoolSize,
|
||||||
auto_reconnect := AutoReconn} = Config}) ->
|
auto_reconnect := AutoReconn,
|
||||||
|
ssl := SSL } = Config}) ->
|
||||||
logger:info("starting redis connector: ~p, config: ~p", [InstId, Config]),
|
logger:info("starting redis connector: ~p, config: ~p", [InstId, Config]),
|
||||||
Servers = case Type of
|
Servers = case Type of
|
||||||
single -> [{servers, [maps:get(server, Config)]}];
|
single -> [{servers, [maps:get(server, Config)]}];
|
||||||
|
@ -92,8 +93,13 @@ on_start(InstId, #{config :=#{redis_type := Type,
|
||||||
{password, maps:get(password, Config, "")},
|
{password, maps:get(password, Config, "")},
|
||||||
{auto_reconnect, reconn_interval(AutoReconn)}
|
{auto_reconnect, reconn_interval(AutoReconn)}
|
||||||
] ++ Servers,
|
] ++ Servers,
|
||||||
Options = init_ssl_opts(Config, InstId) ++
|
Options = case maps:get(enable, SSL) of
|
||||||
[{sentinel, maps:get(sentinel, Config, undefined)}],
|
true ->
|
||||||
|
[{ssl, true},
|
||||||
|
{ssl_options, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)}
|
||||||
|
];
|
||||||
|
false -> [{ssl, false}]
|
||||||
|
end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
|
||||||
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
||||||
case Type of
|
case Type of
|
||||||
cluster ->
|
cluster ->
|
||||||
|
@ -157,13 +163,6 @@ cmd(Conn, _Type, Command) ->
|
||||||
connect(Opts) ->
|
connect(Opts) ->
|
||||||
eredis:start_link(Opts).
|
eredis:start_link(Opts).
|
||||||
|
|
||||||
init_ssl_opts(#{ssl := true} = Config, InstId) ->
|
|
||||||
[{ssl, true},
|
|
||||||
{ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)}
|
|
||||||
];
|
|
||||||
init_ssl_opts(_Config, _InstId) ->
|
|
||||||
[{ssl, false}].
|
|
||||||
|
|
||||||
redis_fields() ->
|
redis_fields() ->
|
||||||
[ {pool_size, fun emqx_connector_schema_lib:pool_size/1}
|
[ {pool_size, fun emqx_connector_schema_lib:pool_size/1}
|
||||||
, {password, fun emqx_connector_schema_lib:password/1}
|
, {password, fun emqx_connector_schema_lib:password/1}
|
||||||
|
|
|
@ -51,6 +51,31 @@
|
||||||
, servers/0
|
, servers/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([structs/0, fields/1]).
|
||||||
|
|
||||||
|
structs() -> [ssl_on, ssl_off].
|
||||||
|
|
||||||
|
fields(ssl_on) ->
|
||||||
|
[ {enable, #{type => true}}
|
||||||
|
, {cacertfile, fun cacertfile/1}
|
||||||
|
, {keyfile, fun keyfile/1}
|
||||||
|
, {certfile, fun certfile/1}
|
||||||
|
, {verify, fun verify/1}
|
||||||
|
];
|
||||||
|
|
||||||
|
fields(ssl_off) ->
|
||||||
|
[ {enable, #{type => false}} ].
|
||||||
|
|
||||||
|
ssl_fields() ->
|
||||||
|
[ {ssl, #{type => hoconsc:union(
|
||||||
|
[ hoconsc:ref(?MODULE, ssl_on)
|
||||||
|
, hoconsc:ref(?MODULE, ssl_off)
|
||||||
|
]),
|
||||||
|
default => hoconsc:ref(?MODULE, ssl_off)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
].
|
||||||
|
|
||||||
relational_db_fields() ->
|
relational_db_fields() ->
|
||||||
[ {server, fun server/1}
|
[ {server, fun server/1}
|
||||||
, {database, fun database/1}
|
, {database, fun database/1}
|
||||||
|
@ -60,14 +85,6 @@ relational_db_fields() ->
|
||||||
, {auto_reconnect, fun auto_reconnect/1}
|
, {auto_reconnect, fun auto_reconnect/1}
|
||||||
].
|
].
|
||||||
|
|
||||||
ssl_fields() ->
|
|
||||||
[ {ssl, fun ssl/1}
|
|
||||||
, {cacertfile, fun cacertfile/1}
|
|
||||||
, {keyfile, fun keyfile/1}
|
|
||||||
, {certfile, fun certfile/1}
|
|
||||||
, {verify, fun verify/1}
|
|
||||||
].
|
|
||||||
|
|
||||||
server(type) -> emqx_schema:ip_port();
|
server(type) -> emqx_schema:ip_port();
|
||||||
server(validator) -> [?REQUIRED("the field 'server' is required")];
|
server(validator) -> [?REQUIRED("the field 'server' is required")];
|
||||||
server(_) -> undefined.
|
server(_) -> undefined.
|
||||||
|
@ -93,19 +110,15 @@ auto_reconnect(type) -> boolean();
|
||||||
auto_reconnect(default) -> true;
|
auto_reconnect(default) -> true;
|
||||||
auto_reconnect(_) -> undefined.
|
auto_reconnect(_) -> undefined.
|
||||||
|
|
||||||
ssl(type) -> boolean();
|
cacertfile(type) -> string();
|
||||||
ssl(default) -> false;
|
|
||||||
ssl(_) -> undefined.
|
|
||||||
|
|
||||||
cacertfile(type) -> binary();
|
|
||||||
cacertfile(default) -> "";
|
cacertfile(default) -> "";
|
||||||
cacertfile(_) -> undefined.
|
cacertfile(_) -> undefined.
|
||||||
|
|
||||||
keyfile(type) -> binary();
|
keyfile(type) -> string();
|
||||||
keyfile(default) -> "";
|
keyfile(default) -> "";
|
||||||
keyfile(_) -> undefined.
|
keyfile(_) -> undefined.
|
||||||
|
|
||||||
certfile(type) -> binary();
|
certfile(type) -> string();
|
||||||
certfile(default) -> "";
|
certfile(default) -> "";
|
||||||
certfile(_) -> undefined.
|
certfile(_) -> undefined.
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ save_files_return_opts(Options, Dir) ->
|
||||||
Get = fun(Key) -> GetD(Key, undefined) end,
|
Get = fun(Key) -> GetD(Key, undefined) end,
|
||||||
KeyFile = Get(keyfile),
|
KeyFile = Get(keyfile),
|
||||||
CertFile = Get(certfile),
|
CertFile = Get(certfile),
|
||||||
CAFile = GetD(cacertfile, Get(cafile)),
|
CAFile = Get(cacertfile),
|
||||||
Key = do_save_file(KeyFile, Dir),
|
Key = do_save_file(KeyFile, Dir),
|
||||||
Cert = do_save_file(CertFile, Dir),
|
Cert = do_save_file(CertFile, Dir),
|
||||||
CA = do_save_file(CAFile, Dir),
|
CA = do_save_file(CAFile, Dir),
|
||||||
|
|
Loading…
Reference in New Issue