emqx/apps/emqx_connector/src/emqx_connector_redis.erl

289 lines
8.6 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_redis).
-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-export([roots/0, fields/1]).
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([
on_start/2,
on_stop/2,
on_query/4,
on_get_status/2
]).
-export([do_get_status/1]).
-export([connect/1]).
-export([cmd/3]).
%% redis host don't need parse
-define(REDIS_HOST_OPTIONS, #{
host_type => hostname,
default_port => ?REDIS_DEFAULT_PORT
}).
%%=====================================================================
roots() ->
[
{config, #{
type => hoconsc:union(
[
hoconsc:ref(?MODULE, cluster),
hoconsc:ref(?MODULE, single),
hoconsc:ref(?MODULE, sentinel)
]
)
}}
].
fields(single) ->
[
{server, fun server/1},
{redis_type, #{
type => hoconsc:enum([single]),
required => true,
desc => ?DESC("single")
}}
] ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(cluster) ->
[
{servers, fun servers/1},
{redis_type, #{
type => hoconsc:enum([cluster]),
required => true,
desc => ?DESC("cluster")
}}
] ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(sentinel) ->
[
{servers, fun servers/1},
{redis_type, #{
type => hoconsc:enum([sentinel]),
required => true,
desc => ?DESC("sentinel")
}},
{sentinel, #{
type => string(),
required => true,
desc => ?DESC("sentinel_desc")
}}
] ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields().
server(type) -> emqx_schema:ip_port();
server(required) -> true;
server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
server(converter) -> fun to_server_raw/1;
server(desc) -> ?DESC("server");
server(_) -> undefined.
servers(type) -> list();
servers(required) -> true;
servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
servers(converter) -> fun to_servers_raw/1;
servers(desc) -> ?DESC("servers");
servers(_) -> undefined.
%% ===================================================================
on_start(
InstId,
#{
redis_type := Type,
database := Database,
pool_size := PoolSize,
auto_reconnect := AutoReconn,
ssl := SSL
} = Config
) ->
?SLOG(info, #{
msg => "starting_redis_connector",
connector => InstId,
config => Config
}),
Servers =
case Type of
single -> [{servers, [maps:get(server, Config)]}];
_ -> [{servers, maps:get(servers, Config)}]
end,
Opts =
[
{pool_size, PoolSize},
{database, Database},
{password, maps:get(password, Config, "")},
{auto_reconnect, reconn_interval(AutoReconn)}
] ++ Servers,
Options =
case maps:get(enable, SSL) of
true ->
[
{ssl, true},
{ssl_options, emqx_tls_lib:to_client_opts(SSL)}
];
false ->
[{ssl, false}]
end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
State = #{poolname => PoolName, type => Type, auto_reconnect => AutoReconn},
case Type of
cluster ->
case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of
{ok, _} -> {ok, State};
{ok, _, _} -> {ok, State};
{error, Reason} -> {error, Reason}
end;
_ ->
case
emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}])
of
ok -> {ok, State};
{error, Reason} -> {error, Reason}
end
end.
on_stop(InstId, #{poolname := PoolName, type := Type}) ->
?SLOG(info, #{
msg => "stopping_redis_connector",
connector => InstId
}),
case Type of
cluster -> eredis_cluster:stop_pool(PoolName);
_ -> emqx_plugin_libs_pool:stop_pool(PoolName)
end.
on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) ->
?TRACE(
"QUERY",
"redis_connector_received",
#{connector => InstId, sql => Command, state => State}
),
Result =
case Type of
cluster -> eredis_cluster:q(PoolName, Command);
_ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover)
end,
case Result of
{error, Reason} ->
?SLOG(error, #{
msg => "redis_connector_do_cmd_query_failed",
connector => InstId,
sql => Command,
reason => Reason
}),
emqx_resource:query_failed(AfterCommand);
_ ->
emqx_resource:query_success(AfterCommand)
end,
Result.
extract_eredis_cluster_workers(PoolName) ->
lists:flatten([
gen_server:call(PoolPid, get_all_workers)
|| PoolPid <- eredis_cluster_monitor:get_all_pools(PoolName)
]).
eredis_cluster_workers_exist_and_are_connected(Workers) ->
length(Workers) > 0 andalso
lists:all(
fun({_, Pid, _, _}) ->
eredis_cluster_pool_worker:is_connected(Pid) =:= true
end,
Workers
).
on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect := AutoReconn}) ->
case eredis_cluster:pool_exists(PoolName) of
true ->
Workers = extract_eredis_cluster_workers(PoolName),
Health = eredis_cluster_workers_exist_and_are_connected(Workers),
status_result(Health, AutoReconn);
false ->
disconnect
end;
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) ->
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1),
status_result(Health, AutoReconn).
do_get_status(Conn) ->
case eredis:q(Conn, ["PING"]) of
{ok, _} -> true;
_ -> false
end.
status_result(_Status = true, _AutoReconn) -> connected;
status_result(_Status = false, _AutoReconn = true) -> connecting;
status_result(_Status = false, _AutoReconn = false) -> disconnected.
reconn_interval(true) -> 15;
reconn_interval(false) -> false.
cmd(Conn, cluster, Command) ->
eredis_cluster:q(Conn, Command);
cmd(Conn, _Type, Command) ->
eredis:q(Conn, Command).
%% ===================================================================
connect(Opts) ->
eredis:start_link(Opts).
redis_fields() ->
[
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{password, fun emqx_connector_schema_lib:password/1},
{database, #{
type => integer(),
default => 0,
required => true,
desc => ?DESC("database")
}},
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
].
-spec to_server_raw(string()) ->
{string(), pos_integer()}.
to_server_raw(Server) ->
emqx_connector_schema_lib:parse_server(Server, ?REDIS_HOST_OPTIONS).
-spec to_servers_raw(string()) ->
[{string(), pos_integer()}].
to_servers_raw(Servers) ->
lists:map(
fun(Server) ->
emqx_connector_schema_lib:parse_server(Server, ?REDIS_HOST_OPTIONS)
end,
string:tokens(str(Servers), ", ")
).
str(A) when is_atom(A) ->
atom_to_list(A);
str(B) when is_binary(B) ->
binary_to_list(B);
str(S) when is_list(S) ->
S.