emqx/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl

239 lines
7.0 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_redis).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
-export([conn_bridge_examples/1]).
-export([type_name_fields/1, connector_fields/1]).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% -------------------------------------------------------------------------------------------------
%% api
conn_bridge_examples(Method) ->
[
#{
<<"redis_single">> => #{
summary => <<"Redis Single Node Bridge">>,
value => values("single", Method)
}
},
#{
<<"redis_sentinel">> => #{
summary => <<"Redis Sentinel Bridge">>,
value => values("sentinel", Method)
}
},
#{
<<"redis_cluster">> => #{
summary => <<"Redis Cluster Bridge">>,
value => values("cluster", Method)
}
}
].
values(Protocol, get) ->
values(Protocol, post);
values("single", post) ->
SpecificOpts = #{
server => <<"127.0.0.1:6379">>,
redis_type => single,
database => 1
},
values(common, "single", SpecificOpts);
values("sentinel", post) ->
SpecificOpts = #{
servers => [<<"127.0.0.1:26379">>],
redis_type => sentinel,
sentinel => <<"mymaster">>,
database => 1
},
values(common, "sentinel", SpecificOpts);
values("cluster", post) ->
SpecificOpts = #{
servers => [<<"127.0.0.1:6379">>],
redis_type => cluster
},
values(common, "cluster", SpecificOpts);
values(Protocol, put) ->
maps:without([type, name], values(Protocol, post)).
values(common, RedisType, SpecificOpts) ->
Config = #{
type => list_to_atom("redis_" ++ RedisType),
name => <<"redis_bridge">>,
enable => true,
local_topic => <<"local/topic/#">>,
pool_size => 8,
password => <<"******">>,
command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>],
resource_opts => values(resource_opts, RedisType, #{}),
ssl => #{enable => false}
},
maps:merge(Config, SpecificOpts);
values(resource_opts, "cluster", SpecificOpts) ->
SpecificOpts;
values(resource_opts, _RedisType, SpecificOpts) ->
maps:merge(
#{
batch_size => 1,
batch_time => <<"20ms">>
},
SpecificOpts
).
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
namespace() -> "bridge_redis".
roots() -> [].
fields(action_parameters) ->
[
command_template(),
{redis_type,
?HOCON(
?ENUM([single, sentinel, cluster]), #{
required => false,
desc => ?DESC(redis_type),
hidden => true,
importance => ?IMPORTANCE_HIDDEN
}
)}
];
fields("post_single") ->
method_fields(post, redis_single);
fields("post_sentinel") ->
method_fields(post, redis_sentinel);
fields("post_cluster") ->
method_fields(post, redis_cluster);
fields("put_single") ->
method_fields(put, redis_single);
fields("put_sentinel") ->
method_fields(put, redis_sentinel);
fields("put_cluster") ->
method_fields(put, redis_cluster);
fields("get_single") ->
method_fields(get, redis_single);
fields("get_sentinel") ->
method_fields(get, redis_sentinel);
fields("get_cluster") ->
method_fields(get, redis_cluster);
%% old bridge v1 schema
fields(Type) when
Type == redis_single;
Type == redis_sentinel;
Type == redis_cluster
->
redis_bridge_common_fields(Type) ++
connector_fields(Type);
fields("creation_opts_" ++ Type) ->
resource_creation_fields(Type).
method_fields(post, ConnectorType) ->
redis_bridge_common_fields(ConnectorType) ++
connector_fields(ConnectorType) ++
type_name_fields(ConnectorType);
method_fields(get, ConnectorType) ->
redis_bridge_common_fields(ConnectorType) ++
connector_fields(ConnectorType) ++
type_name_fields(ConnectorType) ++
emqx_bridge_schema:status_fields();
method_fields(put, ConnectorType) ->
redis_bridge_common_fields(ConnectorType) ++
connector_fields(ConnectorType).
redis_bridge_common_fields(Type) ->
emqx_bridge_schema:common_bridge_fields() ++
[
{local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})},
command_template()
] ++
v1_resource_fields(Type).
connector_fields(Type) ->
emqx_redis:fields(Type).
type_name_fields(Type) ->
[
{type, mk(Type, #{required => true, desc => ?DESC("desc_type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
].
v1_resource_fields(Type) ->
[
{resource_opts,
mk(
?R_REF("creation_opts_" ++ atom_to_list(Type)),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
].
resource_creation_fields("redis_cluster") ->
% TODO
% Cluster bridge is currently incompatible with batching.
Fields = emqx_resource_schema:fields("creation_opts"),
lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time, enable_batch]);
resource_creation_fields(_) ->
emqx_resource_schema:fields("creation_opts").
desc(action_parameters) ->
?DESC("desc_action_parameters");
desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for Redis using `", string:to_upper(Method), "` method."];
desc(redis_single) ->
?DESC(emqx_redis, "single");
desc(redis_sentinel) ->
?DESC(emqx_redis, "sentinel");
desc(redis_cluster) ->
?DESC(emqx_redis, "cluster");
desc("creation_opts_" ++ _Type) ->
?DESC(emqx_resource_schema, "creation_opts");
desc(_) ->
undefined.
command_template(type) ->
list(binary());
command_template(required) ->
true;
command_template(validator) ->
fun is_command_template_valid/1;
command_template(desc) ->
?DESC("command_template");
command_template(_) ->
undefined.
is_command_template_valid(CommandSegments) ->
case
is_list(CommandSegments) andalso length(CommandSegments) > 0 andalso
lists:all(fun is_binary/1, CommandSegments)
of
true ->
ok;
false ->
{error,
"the value of the field 'command_template' should be a nonempty "
"list of strings (templates for Redis command and arguments)"}
end.
command_template() ->
{command_template, fun command_template/1}.