%%-------------------------------------------------------------------- %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_redis). -include_lib("emqx_connector/include/emqx_connector.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). -export([namespace/0, roots/0, fields/1, redis_fields/0, desc/1]). -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource -export([ callback_mode/0, on_start/2, on_stop/2, on_query/3, on_get_status/2 ]). -export([do_get_status/1]). -export([connect/1]). -export([do_cmd/3]). %% redis host don't need parse -define(REDIS_HOST_OPTIONS, #{ default_port => ?REDIS_DEFAULT_PORT }). %%===================================================================== namespace() -> "redis". roots() -> [ {config, #{ type => hoconsc:union( [ ?R_REF(redis_cluster), ?R_REF(redis_single), ?R_REF(redis_sentinel) ] ) }} ]. fields(redis_single) -> fields(redis_single_connector) ++ emqx_connector_schema_lib:ssl_fields(); fields(redis_single_connector) -> [ {server, server()}, redis_type(single) ] ++ redis_fields(); fields(redis_cluster) -> fields(redis_cluster_connector) ++ emqx_connector_schema_lib:ssl_fields(); fields(redis_cluster_connector) -> [ {servers, servers()}, redis_type(cluster) ] ++ lists:keydelete(database, 1, redis_fields()); fields(redis_sentinel) -> fields(redis_sentinel_connector) ++ emqx_connector_schema_lib:ssl_fields(); fields(redis_sentinel_connector) -> [ {servers, servers()}, redis_type(sentinel), {sentinel, #{ type => string(), required => true, desc => ?DESC("sentinel_desc") }} ] ++ redis_fields(). server() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS). servers() -> Meta = #{desc => ?DESC("servers")}, emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS). desc(redis_cluster_connector) -> ?DESC(redis_cluster_connector); desc(redis_single_connector) -> ?DESC(redis_single_connector); desc(redis_sentinel_connector) -> ?DESC(redis_sentinel_connector); desc(_) -> undefined. %% =================================================================== redis_type(Type) -> {redis_type, #{ type => Type, default => Type, required => false, desc => ?DESC(Type) }}. callback_mode() -> always_sync. on_start(InstId, Config0) -> ?SLOG(info, #{ msg => "starting_redis_connector", connector => InstId, config => emqx_utils:redact(Config0) }), Config = config(Config0), #{pool_size := PoolSize, ssl := SSL, redis_type := Type} = Config, Options = ssl_options(SSL) ++ [{sentinel, maps:get(sentinel, Config, undefined)}], Opts = [ {username, maps:get(username, Config, undefined)}, {password, maps:get(password, Config, "")}, {servers, servers(Config)}, {options, Options}, {pool_size, PoolSize}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL} ] ++ database(Config), State = #{pool_name => InstId, type => Type}, ok = emqx_resource:allocate_resource(InstId, type, Type), ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case Type of cluster -> case eredis_cluster:start_pool(InstId, Opts) of {ok, _} -> {ok, State}; {ok, _, _} -> {ok, State}; {error, Reason} -> {error, Reason} end; _ -> case emqx_resource_pool:start(InstId, ?MODULE, Opts) of ok -> {ok, State}; {error, Reason} -> {error, Reason} end end. ssl_options(SSL = #{enable := true}) -> [ {ssl, true}, {ssl_options, emqx_tls_lib:to_client_opts(SSL)} ]; ssl_options(#{enable := false}) -> [{ssl, false}]. on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_redis_connector", connector => InstId }), case emqx_resource:get_allocated_resources(InstId) of #{pool_name := PoolName, type := cluster} -> case eredis_cluster:stop_pool(PoolName) of {error, not_found} -> ok; ok -> ok; Error -> Error end; #{pool_name := PoolName, type := _} -> emqx_resource_pool:stop(PoolName); _ -> ok end. on_query(InstId, {cmd, _} = Query, State) -> do_query(InstId, Query, State); on_query(InstId, {cmds, _} = Query, State) -> do_query(InstId, Query, State). do_query(InstId, Query, #{pool_name := PoolName, type := Type} = State) -> ?TRACE( "QUERY", "redis_connector_received", #{connector => InstId, query => Query, state => State} ), Result = case Type of cluster -> do_cmd(PoolName, cluster, Query); _ -> ecpool:pick_and_do(PoolName, {?MODULE, do_cmd, [Type, Query]}, no_handover) end, case Result of {error, Reason} -> ?SLOG(error, #{ msg => "redis_connector_do_query_failed", connector => InstId, query => Query, reason => Reason }), case is_unrecoverable_error(Reason) of true -> {error, {unrecoverable_error, Reason}}; false when Reason =:= ecpool_empty -> {error, {recoverable_error, Reason}}; false -> Result end; _ -> Result end. is_unrecoverable_error(Results) when is_list(Results) -> lists:any(fun is_unrecoverable_error/1, Results); is_unrecoverable_error({error, <<"ERR unknown command ", _/binary>>}) -> true; is_unrecoverable_error({error, invalid_cluster_command}) -> true; is_unrecoverable_error(_) -> false. on_get_status(_InstId, #{type := cluster, pool_name := PoolName} = State) -> case eredis_cluster:pool_exists(PoolName) of true -> %% eredis_cluster has null slot even pool_exists when emqx start before redis cluster. %% we need restart eredis_cluster pool when pool_worker(slot) is empty. %% If the pool is empty, it means that there are no workers attempting to reconnect. %% In this case, we can directly consider it as a disconnect and then proceed to reconnect. case eredis_cluster_monitor:get_all_pools(PoolName) of [] -> ?status_disconnected; [_ | _] -> do_cluster_status_check(PoolName, State) end; false -> ?status_disconnected end; on_get_status(_InstId, #{pool_name := PoolName} = State) -> HealthCheckResoults = emqx_resource_pool:health_check_workers( PoolName, fun ?MODULE:do_get_status/1, emqx_resource_pool:health_check_timeout(), #{return_values => true} ), case HealthCheckResoults of {ok, Results} -> sum_worker_results(Results, State); Error -> {?status_disconnected, State, Error} end. do_cluster_status_check(Pool, State) -> Pongs = eredis_cluster:qa(Pool, [<<"PING">>]), sum_worker_results(Pongs, State). do_get_status(Conn) -> eredis:q(Conn, ["PING"]). sum_worker_results([], _State) -> ?status_connected; sum_worker_results([{error, <<"NOAUTH Authentication required.">>} = Error | _Rest], State) -> ?tp(emqx_redis_auth_required_error, #{}), %% This requires user action to fix so we set the status to disconnected {?status_disconnected, State, {unhealthy_target, Error}}; sum_worker_results([{ok, _} | Rest], State) -> sum_worker_results(Rest, State); sum_worker_results([Error | _Rest], State) -> ?SLOG( warning, #{ msg => "emqx_redis_check_status_error", error => Error } ), {?status_connecting, State, Error}. do_cmd(PoolName, cluster, {cmd, Command}) -> eredis_cluster:q(PoolName, Command); do_cmd(Conn, _Type, {cmd, Command}) -> eredis:q(Conn, Command); do_cmd(PoolName, cluster, {cmds, Commands}) -> % TODO % Cluster mode is currently incompatible with batching. wrap_qp_result([eredis_cluster:q(PoolName, Command) || Command <- Commands]); do_cmd(Conn, _Type, {cmds, Commands}) -> wrap_qp_result(eredis:qp(Conn, Commands)). wrap_qp_result({error, _} = Error) -> Error; wrap_qp_result(Results) when is_list(Results) -> AreAllOK = lists:all( fun ({ok, _}) -> true; ({error, _}) -> false end, Results ), case AreAllOK of true -> {ok, Results}; false -> {error, Results} end. %% =================================================================== %% parameters for connector config(#{parameters := #{} = Param} = Config) -> maps:merge(maps:remove(parameters, Config), Param); %% is for authn/authz config(Config) -> Config. servers(#{server := Server}) -> servers(Server); servers(#{servers := Servers}) -> servers(Servers); servers(Servers) -> lists:map( fun(#{hostname := Host, port := Port}) -> {Host, Port} end, emqx_schema:parse_servers(Servers, ?REDIS_HOST_OPTIONS) ). database(#{redis_type := cluster}) -> []; database(#{database := Database}) -> [{database, Database}]. connect(Opts) -> eredis:start_link(Opts). redis_fields() -> [ {pool_size, fun emqx_connector_schema_lib:pool_size/1}, {username, fun emqx_connector_schema_lib:username/1}, {password, emqx_connector_schema_lib:password_field()}, {database, #{ type => non_neg_integer(), default => 0, desc => ?DESC("database") }}, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} ].