diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 316f4b77c..ae6f1d547 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -12,6 +12,7 @@ -type flag() :: true | false. -type duration() :: integer(). -type duration_s() :: integer(). +-type duration_ms() :: integer(). -type bytesize() :: integer(). -type percent() :: float(). -type file() :: string(). @@ -22,6 +23,7 @@ -typerefl_from_string({flag/0, emqx_schema, to_flag}). -typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). +-typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}). -typerefl_from_string({bytesize/0, emqx_schema, to_bytesize}). -typerefl_from_string({percent/0, emqx_schema, to_percent}). -typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}). @@ -29,13 +31,13 @@ -typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}). % workaround: prevent being recognized as unused functions --export([to_duration/1, to_duration_s/1, to_bytesize/1, +-export([to_duration/1, to_duration_s/1, to_duration_ms/1, to_bytesize/1, to_flag/1, to_percent/1, to_comma_separated_list/1, to_bar_separated_list/1, to_ip_port/1]). -behaviour(hocon_schema). --reflect_type([ log_level/0, flag/0, duration/0, duration_s/0, +-reflect_type([ log_level/0, flag/0, duration/0, duration_s/0, duration_ms/0, bytesize/0, percent/0, file/0, comma_separated_list/0, bar_separated_list/0, ip_port/0]). @@ -1208,6 +1210,12 @@ to_duration_s(Str) -> _ -> {error, Str} end. +to_duration_ms(Str) -> + case hocon_postprocess:duration(Str) of + I when is_integer(I) -> {ok, ceiling(I)}; + _ -> {error, Str} + end. + to_bytesize(Str) -> case hocon_postprocess:bytesize(Str) of I when is_integer(I) -> {ok, I}; diff --git a/apps/emqx_connector/include/emqx_connector.hrl b/apps/emqx_connector/include/emqx_connector.hrl new file mode 100644 index 000000000..143816402 --- /dev/null +++ b/apps/emqx_connector/include/emqx_connector.hrl @@ -0,0 +1,4 @@ +-define(VALID, emqx_resource_validator). +-define(REQUIRED(MSG), ?VALID:required(MSG)). +-define(MAX(MAXV), ?VALID:max(number, MAXV)). +-define(MIN(MINV), ?VALID:min(number, MINV)). diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl new file mode 100644 index 000000000..3b0af5aa7 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -0,0 +1,144 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector_ldap). + +-include("emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). + +-export([ schema/0 + ]). + +%% callbacks of behaviour emqx_resource +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_health_check/2 + , on_jsonify/1 + ]). + +-export([do_health_check/1]). + +-export([connect/1]). + +-export([search/4]). +%%===================================================================== +schema() -> + redis_fields() ++ + emqx_connector_schema_lib:ssl_fields(). + +on_jsonify(Config) -> + Config. + +%% =================================================================== +on_start(InstId, #{servers := Servers0, + port := Port, + bind_dn := BindDn, + bind_password := BindPassword, + timeout := Timeout, + pool_size := PoolSize, + auto_reconnect := AutoReconn} = Config) -> + logger:info("starting redis connector: ~p, config: ~p", [InstId, Config]), + Servers = [begin proplists:get_value(host, S) end || S <- Servers0], + SslOpts = init_ssl_opts(Config, InstId), + Opts = [{servers, Servers}, + {port, Port}, + {bind_dn, BindDn}, + {bind_password, BindPassword}, + {timeout, Timeout}, + {pool_size, PoolSize}, + {auto_reconnect, reconn_interval(AutoReconn)}, + {servers, Servers}], + PoolName = emqx_plugin_libs_pool:pool_name(InstId), + _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts), + {ok, #{poolname => PoolName}}. + +on_stop(InstId, #{poolname := PoolName}) -> + logger:info("stopping redis connector: ~p", [InstId]), + emqx_plugin_libs_pool:stop_pool(PoolName). + +on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) -> + logger:debug("redis connector ~p received request: ~p, at state: ~p", [InstId, {Base, Filter, Attributes}, State]), + case Result = ecpool:pick_and_do(PoolName, {?MODULE, search, [Base, Filter, Attributes]}, no_handover) of + {error, Reason} -> + logger:debug("redis connector ~p do request failed, request: ~p, reason: ~p", [InstId, {Base, Filter, Attributes}, Reason]), + emqx_resource:query_failed(AfterQuery); + _ -> + emqx_resource:query_success(AfterQuery) + end, + Result. + +on_health_check(_InstId, #{poolname := PoolName} = State) -> + emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). + +do_health_check(_Conn) -> + {ok, true}. + +reconn_interval(true) -> 15; +reconn_interval(false) -> false. + +search(Conn, Base, Filter, Attributes) -> + eldap2:search(Conn, [{base, Base}, + {filter, Filter}, + {attributes, Attributes}, + {deref, eldap2:derefFindingBaseObj()}]). + +%% =================================================================== +connect(Opts) -> + Servers = proplists:get_value(servers, Opts, ["localhost"]), + Port = proplists:get_value(port, Opts, 389), + Timeout = proplists:get_value(timeout, Opts, 30), + BindDn = proplists:get_value(bind_dn, Opts), + BindPassword = proplists:get_value(bind_password, Opts), + SslOpts = case proplists:get_value(ssl, Opts, false) of + true -> + [{sslopts, proplists:get_value(sslopts, Opts, [])}, {ssl, true}]; + false -> + [{ssl, false}] + end, + LdapOpts = [{port, Port}, + {timeout, Timeout}] ++ SslOpts, + {ok, LDAP} = eldap2:open(Servers, LdapOpts), + ok = eldap2:simple_bind(LDAP, BindDn, BindPassword), + {ok, LDAP}. + +init_ssl_opts(#{ssl := true} = Config, InstId) -> + [{ssl, true}, + {sslopts, emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)} + ]; +init_ssl_opts(_Config, _InstId) -> + [{ssl, false}]. + +redis_fields() -> + [ {servers, fun emqx_connector_schema_lib:servers/1} + , {port, fun port/1} + , {pool_size, fun emqx_connector_schema_lib:pool_size/1} + , {bind_dn, fun bind_dn/1} + , {bind_password, fun emqx_connector_schema_lib:password/1} + , {timeout, fun duration/1} + , {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} + ]. + +bind_dn(type) -> binary(); +bind_dn(default) -> 0; +bind_dn(_) -> undefined. + +port(type) -> integer(); +port(default) -> 389; +port(_) -> undefined. + +duration(type) -> emqx_schema:duration_ms(); +duration(_) -> undefined. diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl new file mode 100644 index 000000000..26a36dd0a --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -0,0 +1,215 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector_mongo). + +-include("emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). + +-export([ schema/0 + ]). + +%% callbacks of behaviour emqx_resource +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_health_check/2 + , on_jsonify/1 + ]). + +-export([connect/1]). + +-export([mongo_query/5]). +%%===================================================================== +schema() -> + mongodb_fields() ++ + mongodb_topology_fields() ++ + mongodb_rs_set_name_fields() ++ + emqx_connector_schema_lib:ssl_fields(). + +on_jsonify(Config) -> + Config. + +%% =================================================================== +on_start(InstId, #{servers := Servers, + mongo_type := Type, + database := Database, + pool_size := PoolSize} = Config) -> + logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), + SslOpts = init_ssl_opts(Config, InstId), + Hosts = [string:trim(H) || H <- string:tokens(binary_to_list(Servers), ",")], + Opts = [{type, init_type(Type, Config)}, + {hosts, Hosts}, + {pool_size, PoolSize}, + {options, init_topology_options(maps:to_list(Config), [])}, + {worker_options, init_worker_options(maps:to_list(Config), SslOpts)}], + + %% test the connection + TestOpts = [{database, Database}] ++ host_port(hd(Hosts)), + {ok, TestConn} = mc_worker_api:connect(TestOpts), + + PoolName = emqx_plugin_libs_pool:pool_name(InstId), + _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts), + {ok, #{pool => PoolName, + type => Type, + test_conn => TestConn, + test_opts => TestOpts}}. + +on_stop(InstId, #{poolname := PoolName}) -> + logger:info("stopping mongodb connector: ~p", [InstId]), + emqx_plugin_libs_pool:stop_pool(PoolName). + +on_query(InstId, {Action, Collection, Selector, Docs}, AfterQuery, #{poolname := PoolName} = State) -> + logger:debug("mongodb connector ~p received request: ~p, at state: ~p", [InstId, {Action, Collection, Selector, Docs}, State]), + case Result = ecpool:pick_and_do(PoolName, {?MODULE, mongo_query, [Action, Collection, Selector, Docs]}, no_handover) of + {error, Reason} -> + logger:debug("mongodb connector ~p do sql query failed, request: ~p, reason: ~p", [InstId, {Action, Collection, Selector, Docs}, Reason]), + emqx_resource:query_failed(AfterQuery); + _ -> + emqx_resource:query_success(AfterQuery) + end, + Result. + +-dialyzer({nowarn_function, [on_health_check/2]}). +on_health_check(_InstId, #{test_opts := TestOpts}) -> + case mc_worker_api:connect(TestOpts) of + {ok, TestConn} -> + mc_worker_api:disconnect(TestConn), + {ok, true}; + {error, _} -> + {ok, false} + end. + +%% =================================================================== +connect(Opts) -> + Type = proplists:get_value(mongo_type, Opts, single), + Hosts = proplists:get_value(hosts, Opts, []), + Options = proplists:get_value(options, Opts, []), + WorkerOptions = proplists:get_value(worker_options, Opts, []), + mongo_api:connect(Type, Hosts, Options, WorkerOptions). + +mongo_query(Conn, find, Collection, Selector, Docs) -> + mongo_api:find(Conn, Collection, Selector, Docs); + +%% Todo xxx +mongo_query(_Conn, _Action, _Collection, _Selector, _Docs) -> + ok. + +init_type(rs, #{rs_set_name := Name}) -> + {rs, Name}; +init_type(Type, _Opts) -> + Type. + +init_topology_options([{pool_size, Val}| R], Acc) -> + init_topology_options(R, [{pool_size, Val}| Acc]); +init_topology_options([{max_overflow, Val}| R], Acc) -> + init_topology_options(R, [{max_overflow, Val}| Acc]); +init_topology_options([{overflow_ttl, Val}| R], Acc) -> + init_topology_options(R, [{overflow_ttl, Val}| Acc]); +init_topology_options([{overflow_check_period, Val}| R], Acc) -> + init_topology_options(R, [{overflow_check_period, Val}| Acc]); +init_topology_options([{local_threshold_ms, Val}| R], Acc) -> + init_topology_options(R, [{'localThresholdMS', Val}| Acc]); +init_topology_options([{connect_timeout_ms, Val}| R], Acc) -> + init_topology_options(R, [{'connectTimeoutMS', Val}| Acc]); +init_topology_options([{socket_timeout_ms, Val}| R], Acc) -> + init_topology_options(R, [{'socketTimeoutMS', Val}| Acc]); +init_topology_options([{server_selection_timeout_ms, Val}| R], Acc) -> + init_topology_options(R, [{'serverSelectionTimeoutMS', Val}| Acc]); +init_topology_options([{wait_queue_timeout_ms, Val}| R], Acc) -> + init_topology_options(R, [{'waitQueueTimeoutMS', Val}| Acc]); +init_topology_options([{heartbeat_frequency_ms, Val}| R], Acc) -> + init_topology_options(R, [{'heartbeatFrequencyMS', Val}| Acc]); +init_topology_options([{min_heartbeat_frequency_ms, Val}| R], Acc) -> + init_topology_options(R, [{'minHeartbeatFrequencyMS', Val}| Acc]); +init_topology_options([_| R], Acc) -> + init_topology_options(R, Acc); +init_topology_options([], Acc) -> + Acc. + +init_worker_options([{database, V} | R], Acc) -> + init_worker_options(R, [{database, V} | Acc]); +init_worker_options([{auth_source, V} | R], Acc) -> + init_worker_options(R, [{auth_source, V} | Acc]); +init_worker_options([{login, V} | R], Acc) -> + init_worker_options(R, [{login, V} | Acc]); +init_worker_options([{password, V} | R], Acc) -> + init_worker_options(R, [{password, V} | Acc]); +init_worker_options([{w_mode, V} | R], Acc) -> + init_worker_options(R, [{w_mode, V} | Acc]); +init_worker_options([{r_mode, V} | R], Acc) -> + init_worker_options(R, [{r_mode, V} | Acc]); +init_worker_options([_ | R], Acc) -> + init_worker_options(R, Acc); +init_worker_options([], Acc) -> Acc. + +init_ssl_opts(#{ssl := true} = Config, InstId) -> + [{ssl, true}, + {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)} + ]; +init_ssl_opts(_Config, _InstId) -> + [{ssl, false}]. + +host_port(HostPort) -> + case string:split(HostPort, ":") of + [Host, Port] -> + {ok, Host1} = inet:parse_address(Host), + [{host, Host1}, {port, list_to_integer(Port)}]; + [Host] -> + {ok, Host1} = inet:parse_address(Host), + [{host, Host1}] + end. + +mongodb_fields() -> + [ {mongo_type, fun mongo_type/1} + , {servers, fun servers/1} + , {pool_size, fun emqx_connector_schema_lib:pool_size/1} + , {login, fun emqx_connector_schema_lib:username/1} + , {password, fun emqx_connector_schema_lib:password/1} + , {auth_source, fun auth_source/1} + , {database, fun emqx_connector_schema_lib:database/1} + ]. + +mongodb_topology_fields() -> + [ {max_overflow, fun emqx_connector_schema_lib:pool_size/1} + , {overflow_ttl, fun duration/1} + , {overflow_check_period, fun duration/1} + , {local_threshold_ms, fun duration/1} + , {connect_timeout_ms, fun duration/1} + , {socket_timeout_ms, fun duration/1} + , {server_selection_timeout_ms, fun duration/1} + , {wait_queue_timeout_ms, fun duration/1} + , {heartbeat_frequency_ms, fun duration/1} + , {min_heartbeat_frequency_ms, fun duration/1} + ]. + +mongodb_rs_set_name_fields() -> + [ {rs_set_name, fun emqx_connector_schema_lib:database/1} + ]. + +auth_source(type) -> binary(); +auth_source(_) -> undefined. + +servers(type) -> binary(); +servers(validator) -> [?REQUIRED("the field 'servers' is required")]; +servers(_) -> undefined. + +mongo_type(type) -> hoconsc:enum([single, unknown, shared, rs]); +mongo_type(default) -> single; +mongo_type(_) -> undefined. + +duration(type) -> emqx_schema:duration_ms(); +duration(_) -> undefined. diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 551732211..e90d5f171 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -18,10 +18,6 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). --export([ structs/0 - , fields/1 - ]). - %% callbacks of behaviour emqx_resource -export([ on_start/2 , on_stop/2 @@ -32,40 +28,27 @@ -export([connect/1]). +-export([schema/0]). + -export([do_health_check/1]). %%===================================================================== - -structs() -> ["config"]. -fields("config") -> schema(). - schema() -> emqx_connector_schema_lib:relational_db_fields() ++ emqx_connector_schema_lib:ssl_fields(). -on_jsonify(#{<<"server">> := Server, <<"user">> := User, <<"database">> := DB, - <<"password">> := Passwd, <<"cacertfile">> := CAFile, - <<"keyfile">> := KeyFile, <<"certfile">> := CertFile} = Config) -> - Config#{ - <<"user">> => list_to_binary(User), - <<"database">> => list_to_binary(DB), - <<"password">> => list_to_binary(Passwd), - <<"server">> => emqx_connector_schema_lib:ip_port_to_string(Server), - <<"cacertfile">> => list_to_binary(CAFile), - <<"keyfile">> => list_to_binary(KeyFile), - <<"certfile">> => list_to_binary(CertFile) - }. +on_jsonify(#{server := Server}= Config) -> + Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}. %% =================================================================== -on_start(InstId, #{<<"server">> := {Host, Port}, - <<"database">> := DB, - <<"user">> := User, - <<"password">> := Password, - <<"auto_reconnect">> := AutoReconn, - <<"pool_size">> := PoolSize} = Config) -> +on_start(InstId, #{server := {Host, Port}, + database := DB, + username := User, + password := Password, + auto_reconnect := AutoReconn, + pool_size := PoolSize} = Config) -> logger:info("starting mysql connector: ~p, config: ~p", [InstId, Config]), - {ok, _} = application:ensure_all_started(mysql), - SslOpts = case maps:get(<<"ssl">>, Config) of + SslOpts = case maps:get(ssl, Config) of true -> [{ssl, [{server_name_indication, disable} | emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)]}]; diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl new file mode 100644 index 000000000..329138ab1 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -0,0 +1,121 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector_pgsql). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). + +-export([ schema/0 + ]). + +%% callbacks of behaviour emqx_resource +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_health_check/2 + , on_jsonify/1 + ]). + +-export([connect/1]). + +-export([query/2]). + +-export([do_health_check/1]). + +%%===================================================================== +schema() -> + emqx_connector_schema_lib:relational_db_fields() ++ + emqx_connector_schema_lib:ssl_fields(). + +on_jsonify(#{server := Server}= Config) -> + Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}. + +%% =================================================================== +on_start(InstId, #{server := {Host, Port}, + database := DB, + username := User, + password := Password, + auto_reconnect := AutoReconn, + pool_size := PoolSize} = Config) -> + logger:info("starting postgresql connector: ~p, config: ~p", [InstId, Config]), + SslOpts = case maps:get(ssl, Config) of + true -> + [{ssl_opts, [{server_name_indication, disable} | + emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)]}]; + false -> + [] + end, + Options = [{host, Host}, + {port, Port}, + {username, User}, + {password, Password}, + {database, DB}, + {auto_reconnect, reconn_interval(AutoReconn)}, + {pool_size, PoolSize}], + PoolName = emqx_plugin_libs_pool:pool_name(InstId), + _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts), + {ok, #{poolname => PoolName}}. + +on_stop(InstId, #{poolname := PoolName}) -> + logger:info("stopping postgresql connector: ~p", [InstId]), + emqx_plugin_libs_pool:stop_pool(PoolName). + +on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> + logger:debug("postgresql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), + case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL]}, no_handover) of + {error, Reason} -> + logger:debug("postgresql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), + emqx_resource:query_failed(AfterQuery); + _ -> + emqx_resource:query_success(AfterQuery) + end, + Result. + +on_health_check(_InstId, #{poolname := PoolName} = State) -> + emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). + +do_health_check(Conn) -> + ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")). + +%% =================================================================== +reconn_interval(true) -> 15; +reconn_interval(false) -> false. + +connect(Opts) -> + Host = proplists:get_value(host, Opts), + Username = proplists:get_value(username, Opts), + Password = proplists:get_value(password, Opts), + epgsql:connect(Host, Username, Password, conn_opts(Opts)). + +query(Conn, SQL) -> + epgsql:squery(Conn, SQL). + +conn_opts(Opts) -> + conn_opts(Opts, []). +conn_opts([], Acc) -> + Acc; +conn_opts([Opt = {database, _}|Opts], Acc) -> + conn_opts(Opts, [Opt|Acc]); +conn_opts([Opt = {ssl, _}|Opts], Acc) -> + conn_opts(Opts, [Opt|Acc]); +conn_opts([Opt = {port, _}|Opts], Acc) -> + conn_opts(Opts, [Opt|Acc]); +conn_opts([Opt = {timeout, _}|Opts], Acc) -> + conn_opts(Opts, [Opt|Acc]); +conn_opts([Opt = {ssl_opts, _}|Opts], Acc) -> + conn_opts(Opts, [Opt|Acc]); +conn_opts([_Opt|Opts], Acc) -> + conn_opts(Opts, Acc). diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl new file mode 100644 index 000000000..3eebbcf87 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -0,0 +1,141 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector_redis). + +-include("emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). + +-export([ schema/0 + ]). + +%% callbacks of behaviour emqx_resource +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_health_check/2 + , on_jsonify/1 + ]). + +-export([do_health_check/1]). + +-export([connect/1]). + +-export([cmd/3]). + +%%===================================================================== +schema() -> + redis_fields() ++ + redis_sentinel_fields() ++ + emqx_connector_schema_lib:ssl_fields(). + +on_jsonify(Config) -> + Config. + +%% =================================================================== +on_start(InstId, #{servers := Servers, + redis_type := Type, + database := Database, + pool_size := PoolSize, + auto_reconnect := AutoReconn} = Config) -> + logger:info("starting redis connector: ~p, config: ~p", [InstId, Config]), + SslOpts = init_ssl_opts(Config, InstId), + Opts = [{pool_size, PoolSize}, + {database, Database}, + {password, maps:get(password, Config, "")}, + {auto_reconnect, reconn_interval(AutoReconn)}, + {servers, Servers}], + Options = [{options, SslOpts}, {sentinel, maps:get(sentinel, Config, undefined)}], + PoolName = emqx_plugin_libs_pool:pool_name(InstId), + _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ Options), + {ok, #{poolname => PoolName, type => Type}}. + +on_stop(InstId, #{poolname := PoolName}) -> + logger:info("stopping redis connector: ~p", [InstId]), + emqx_plugin_libs_pool:stop_pool(PoolName). + +on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) -> + logger:debug("redis connector ~p received cmd query: ~p, at state: ~p", [InstId, Command, State]), + case Result = ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover) of + {error, Reason} -> + logger:debug("redis connector ~p do cmd query failed, cmd: ~p, reason: ~p", [InstId, Command, Reason]), + emqx_resource:query_failed(AfterCommand); + _ -> + emqx_resource:query_success(AfterCommand) + end, + Result. + +on_health_check(_InstId, #{type := cluster, poolname := PoolName}) -> + Workers = lists:flatten([gen_server:call(PoolPid, get_all_workers) || + PoolPid <- eredis_cluster_monitor:get_all_pools(PoolName)]), + case length(Workers) > 0 andalso lists:all( + fun({_, Pid, _, _}) -> + eredis_cluster_pool_worker:is_connected(Pid) =:= true + end, Workers) of + true -> {ok, true}; + false -> {error, false} + end; +on_health_check(_InstId, #{poolname := PoolName} = State) -> + emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). + +do_health_check(Conn) -> + case eredis:q(Conn, ["PING"]) of + {ok, _} -> true; + _ -> false + end. + +reconn_interval(true) -> 15; +reconn_interval(false) -> false. + +cmd(Conn, cluster, Command) -> + eredis_cluster:q(Conn, Command); +cmd(Conn, _Type, Command) -> + eredis:q(Conn, Command). + +%% =================================================================== +connect(Opts) -> + eredis:start_link(Opts). + +init_ssl_opts(#{ssl := true} = Config, InstId) -> + [{ssl, true}, + {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)} + ]; +init_ssl_opts(_Config, _InstId) -> + [{ssl, false}]. + +redis_fields() -> + [ {redis_type, fun redis_type/1} + , {servers, fun emqx_connector_schema_lib:servers/1} + , {pool_size, fun emqx_connector_schema_lib:pool_size/1} + , {password, fun emqx_connector_schema_lib:password/1} + , {database, fun database/1} + , {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} + ]. + +redis_sentinel_fields() -> + [ {sentinel, fun sentinel_name/1} + ]. + +sentinel_name(type) -> binary(); +sentinel_name(_) -> undefined. + +redis_type(type) -> hoconsc:enum([single, sentinel, cluster]); +redis_type(default) -> single; +redis_type(_) -> undefined. + +database(type) -> integer(); +database(default) -> 0; +database(_) -> undefined. diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index 02bed956b..03572d91d 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -14,6 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_connector_schema_lib). + +-include("emqx_connector.hrl"). -include_lib("typerefl/include/types.hrl"). -export([ relational_db_fields/0 @@ -22,24 +24,38 @@ -export([ to_ip_port/1 , ip_port_to_string/1 + , to_servers/1 + ]). + +-export([ pool_size/1 + , database/1 + , username/1 + , password/1 + , servers/1 + , auto_reconnect/1 ]). -typerefl_from_string({ip_port/0, emqx_connector_schema_lib, to_ip_port}). +-typerefl_from_string({servers/0, emqx_connector_schema_lib, to_servers}). --reflect_type([ip_port/0]). +-type database() :: binary(). +-type pool_size() :: integer(). +-type username() :: binary(). +-type password() :: binary(). +-type servers() :: list(). --type ip_port() :: tuple(). - --define(VALID, emqx_resource_validator). --define(REQUIRED(MSG), ?VALID:required(MSG)). --define(MAX(MAXV), ?VALID:max(number, MAXV)). --define(MIN(MINV), ?VALID:min(number, MINV)). +-reflect_type([ database/0 + , pool_size/0 + , username/0 + , password/0 + , servers/0 + ]). relational_db_fields() -> [ {server, fun server/1} , {database, fun database/1} , {pool_size, fun pool_size/1} - , {user, fun user/1} + , {username, fun username/1} , {password, fun password/1} , {auto_reconnect, fun auto_reconnect/1} ]. @@ -52,12 +68,12 @@ ssl_fields() -> , {verify, fun verify/1} ]. -server(type) -> ip_port(); +server(type) -> emqx_schema:ip_port(); server(validator) -> [?REQUIRED("the field 'server' is required")]; server(_) -> undefined. -database(type) -> string(); -database(validator) -> [?REQUIRED("the field 'server' is required")]; +database(type) -> binary(); +database(validator) -> [?REQUIRED("the field 'database' is required")]; database(_) -> undefined. pool_size(type) -> integer(); @@ -65,11 +81,11 @@ pool_size(default) -> 8; pool_size(validator) -> [?MIN(1), ?MAX(64)]; pool_size(_) -> undefined. -user(type) -> string(); -user(default) -> "root"; -user(_) -> undefined. +username(type) -> binary(); +username(default) -> "root"; +username(_) -> undefined. -password(type) -> string(); +password(type) -> binary(); password(default) -> ""; password(_) -> undefined. @@ -81,15 +97,15 @@ ssl(type) -> boolean(); ssl(default) -> false; ssl(_) -> undefined. -cacertfile(type) -> string(); +cacertfile(type) -> binary(); cacertfile(default) -> ""; cacertfile(_) -> undefined. -keyfile(type) -> string(); +keyfile(type) -> binary(); keyfile(default) -> ""; keyfile(_) -> undefined. -certfile(type) -> string(); +certfile(type) -> binary(); certfile(default) -> ""; certfile(_) -> undefined. @@ -97,6 +113,10 @@ verify(type) -> boolean(); verify(default) -> false; verify(_) -> undefined. +servers(type) -> servers(); +servers(validator) -> [?REQUIRED("the field 'servers' is required")]; +servers(_) -> undefined. + to_ip_port(Str) -> case string:tokens(Str, ":") of [Ip, Port] -> @@ -109,3 +129,13 @@ to_ip_port(Str) -> ip_port_to_string({Ip, Port}) -> iolist_to_binary([inet:ntoa(Ip), ":", integer_to_list(Port)]). + +to_servers(Str) -> + {ok, lists:map(fun(Server) -> + case string:tokens(Server, ":") of + [Ip] -> + [{host, Ip}]; + [Ip, Port] -> + [{host, Ip}, {port, list_to_integer(Port)}] + end + end, string:tokens(Str, " , "))}. diff --git a/apps/emqx_data_bridge/etc/emqx_data_bridge.conf b/apps/emqx_data_bridge/etc/emqx_data_bridge.conf index 056641ebf..50f09b8dd 100644 --- a/apps/emqx_data_bridge/etc/emqx_data_bridge.conf +++ b/apps/emqx_data_bridge/etc/emqx_data_bridge.conf @@ -3,28 +3,124 @@ ##-------------------------------------------------------------------- emqx_data_bridge.bridges: [ -# {name: "mysql-abc" -# type: mysql -# config: { -# server: "127.0.0.1:3306" -# database: mqtt -# pool_size: 1 -# user: root -# password: public -# auto_reconnect: true -# ssl: false -# } -# }, -# {name: "mysql-def" -# type: mysql -# config: { -# server: "127.0.0.1:3306" -# database: mqtt -# pool_size: 1 -# user: root -# password: public -# auto_reconnect: true -# ssl: false -# } -# } + {name: "mysql" + type: mysql + config: { + server: "192.168.0.172:3306" + database: mqtt + pool_size: 1 + username: root + password: public + auto_reconnect: true + ssl: false + } + } + , {name: "pgsql" + type: pgsql + config: { + server: "192.168.0.172:5432" + database: mqtt + pool_size: 1 + username: root + password: public + auto_reconnect: true + ssl: false + } + } + , {name: "mongodb_single" + type: mongo + config: { + servers: "192.168.0.172:27017" + mongo_type: single + pool_size: 1 + login: root + password: public + auth_source: mqtt + database: mqtt + ssl: false + } + } + # ,{name: "mongodb_rs" + # type: mongo + # config: { + # servers: "127.0.0.1:27017" + # mongo_type: rs + # rs_set_name: rs_name + # pool_size: 1 + # login: root + # password: public + # auth_source: mqtt + # database: mqtt + # ssl: false + # } + # } + # ,{name: "mongodb_shared" + # type: mongo + # config: { + # servers: "127.0.0.1:27017" + # mongo_type: shared + # pool_size: 1 + # login: root + # password: public + # auth_source: mqtt + # database: mqtt + # ssl: false + # max_overflow: 1 + # overflow_ttl: + # overflow_check_period: 10s + # local_threshold_ms: 10s + # connect_timeout_ms: 10s + # socket_timeout_ms: 10s + # server_selection_timeout_ms: 10s + # wait_queue_timeout_ms: 10s + # heartbeat_frequency_ms: 10s + # min_heartbeat_frequency_ms: 10s + # } + # } + , {name: "redis_single" + type: redis + config: { + servers: "192.168.0.172:6379" + redis_type: single + pool_size: 1 + database: 0 + password: public + auto_reconnect: true + ssl: false + } + } + # ,{name: "redis_sentinel" + # type: redis + # config: { + # servers: "127.0.0.1:6379, 127.0.0.2:6379, 127.0.0.3:6379" + # redis_type: sentinel + # sentinel_name: mymaster + # pool_size: 1 + # database: 0 + # ssl: false + # } + # } + # ,{name: "redis_cluster" + # type: redis + # config: { + # servers: "127.0.0.1:6379, 127.0.0.2:6379, 127.0.0.3:6379" + # redis_type: cluster + # pool_size: 1 + # database: 0 + # password: "public" + # ssl: false + # } + # } + , {name: "ldap" + type: ldap + config: { + servers: "192.168.0.172" + port: 389 + bind_dn: "cn=root,dc=emqx,dc=io" + bind_password: "public" + timeout: 30s + pool_size: 1 + ssl: false + } + } ] diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge.erl b/apps/emqx_data_bridge/src/emqx_data_bridge.erl index 2adc48111..5877cd3dd 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge.erl @@ -29,9 +29,19 @@ load_bridges() -> application:get_all_env(emqx_data_bridge), []), emqx_data_bridge_monitor:ensure_all_started(Bridges). -resource_type(<<"mysql">>) -> emqx_connector_mysql. +resource_type(<<"mysql">>) -> emqx_connector_mysql; +resource_type(<<"pgsql">>) -> emqx_connector_pgsql; +resource_type(<<"mongo">>) -> emqx_connector_mongo; +resource_type(<<"redis">>) -> emqx_connector_redis; +resource_type(<<"ldap">>) -> emqx_connector_ldap. + + +bridge_type(emqx_connector_mysql) -> <<"mysql">>; +bridge_type(emqx_connector_pgsql) -> <<"pgsql">>; +bridge_type(emqx_connector_mongo) -> <<"mongo">>; +bridge_type(emqx_connector_redis) -> <<"redis">>; +bridge_type(emqx_connector_ldap) -> <<"ldap">>. -bridge_type(emqx_connector_mysql) -> <<"mysql">>. name_to_resource_id(BridgeName) -> <<"bridge:", BridgeName/binary>>. diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl index dddc44f4e..71119264d 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl @@ -54,5 +54,5 @@ health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) -> end || {_WorkerName, Worker} <- ecpool:workers(PoolName)], case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of true -> {ok, State}; - false -> {error, test_query_failed} + false -> {error, test_query_failed, State} end. diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_ssl.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_ssl.erl index 9fc9e66ef..8c9d06454 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_ssl.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_ssl.erl @@ -21,7 +21,7 @@ save_file/2 ]). --type file_input_key() :: binary(). %% <<"file">> | <<"filename">> +-type file_input_key() :: atom() | binary(). %% <<"file">> | <<"filename">> -type file_input() :: #{file_input_key() => binary()}. %% options are below paris @@ -53,21 +53,21 @@ save_files_return_opts(Options, SubDir, ResId) -> %% Returns ssl options for Erlang's ssl application. -spec save_files_return_opts(opts_input(), file:name_all()) -> opts(). save_files_return_opts(Options, Dir) -> - GetD = fun(Key, Default) -> maps:get(Key, Options, Default) end, - Get = fun(Key) -> GetD(Key, undefined) end, - KeyFile = Get(<<"keyfile">>), - CertFile = Get(<<"certfile">>), - CAFile = GetD(<<"cacertfile">>, Get(<<"cafile">>)), + GetD = fun(Key, Default) -> maps:get(key_to_atom(Key), Options, Default) end, + Get = fun(Key) -> GetD(key_to_atom(Key), undefined) end, + KeyFile = Get(keyfile), + CertFile = Get(certfile), + CAFile = GetD(cacertfile, Get(cafile)), Key = do_save_file(KeyFile, Dir), Cert = do_save_file(CertFile, Dir), CA = do_save_file(CAFile, Dir), - Verify = case GetD(<<"verify">>, false) of + Verify = case GetD(verify, false) of false -> verify_none; _ -> verify_peer end, - SNI = Get(<<"server_name_indication">>), - Versions = emqx_tls_lib:integral_versions(Get(<<"tls_versions">>)), - Ciphers = emqx_tls_lib:integral_ciphers(Versions, Get(<<"ciphers">>)), + SNI = Get(server_name_indication), + Versions = emqx_tls_lib:integral_versions(Get(tls_versions)), + Ciphers = emqx_tls_lib:integral_ciphers(Versions, Get(ciphers)), filter([{keyfile, Key}, {certfile, Cert}, {cacertfile, CA}, {verify, Verify}, {server_name_indication, SNI}, {versions, Versions}, {ciphers, Ciphers}]). @@ -83,7 +83,7 @@ filter([]) -> []; filter([{_, ""} | T]) -> filter(T); filter([H | T]) -> [H | filter(T)]. -do_save_file(#{<<"filename">> := FileName, <<"file">> := Content}, Dir) +do_save_file(#{filename := FileName, file := Content}, Dir) when FileName =/= undefined andalso Content =/= undefined -> do_save_file(ensure_str(FileName), iolist_to_binary(Content), Dir); do_save_file(FilePath, _) when is_binary(FilePath) -> @@ -108,3 +108,7 @@ do_save_file(FileName, Content, Dir) -> ensure_str(L) when is_list(L) -> L; ensure_str(B) when is_binary(B) -> unicode:characters_to_list(B, utf8). +key_to_atom(B) when is_binary(B) -> + binary_to_atom(B, utf8); +key_to_atom(A) when is_atom(A) -> + A. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 5a9b21408..859ff083e 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -282,7 +282,7 @@ check_config(ResourceType, RawConfigTerm) -> do_check_config(ResourceType, MapConfig) -> case ?SAFE_CALL(emqx_resource_schema:check(ResourceType, MapConfig)) of {error, Reason} -> {error, Reason}; - Config -> {ok, maps:get(<<"config">>, hocon_schema:richmap_to_map(Config))} + Config -> {ok, maps:get(config, hocon_schema:richmap_to_map(Config))} end. -spec check_and_create(instance_id(), resource_type(), binary() | term()) -> diff --git a/apps/emqx_resource/src/emqx_resource_schema.erl b/apps/emqx_resource/src/emqx_resource_schema.erl index 1de3e4f84..6cdd13e11 100644 --- a/apps/emqx_resource/src/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/emqx_resource_schema.erl @@ -18,6 +18,17 @@ -export([check/2]). -check(SchemaMod, Conf) -> - hocon_schema:check(SchemaMod, Conf, #{nullable => false}). +-export([structs/0, fields/1]). +-behaviour(hocon_schema). + +check(SchemaMod, Conf) -> + _ = erlang:erase(res_schema_mod), + erlang:put(res_schema_mod, SchemaMod), + hocon_schema:check(?MODULE, Conf, #{atom_key => true, nullable => false}). + +structs() -> ["config"]. + +fields("config") -> + SchemaMod = erlang:get(res_schema_mod), + SchemaMod:schema(). diff --git a/rebar.config b/rebar.config index 109b680c5..9afedcf02 100644 --- a/rebar.config +++ b/rebar.config @@ -34,7 +34,7 @@ {deps, [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.6"}}} - , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}} + , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.6"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}