Merge pull request #12030 from zhongwencool/redis-connector-v2-540
Redis connector v2 540
This commit is contained in:
commit
41194cacc8
|
@ -1,7 +1,7 @@
|
|||
sentinel resolve-hostnames yes
|
||||
bind :: 0.0.0.0
|
||||
|
||||
sentinel monitor mymaster redis-sentinel-master 6379 1
|
||||
sentinel auth-pass mymaster public
|
||||
sentinel down-after-milliseconds mymaster 10000
|
||||
sentinel failover-timeout mymaster 20000
|
||||
sentinel monitor mytcpmaster redis-sentinel-master 6379 1
|
||||
sentinel auth-pass mytcpmaster public
|
||||
sentinel down-after-milliseconds mytcpmaster 10000
|
||||
sentinel failover-timeout mytcpmaster 20000
|
||||
|
|
|
@ -8,7 +8,7 @@ tls-key-file /etc/certs/key.pem
|
|||
tls-ca-cert-file /etc/certs/cacert.pem
|
||||
tls-auth-clients no
|
||||
|
||||
sentinel monitor mymaster redis-sentinel-tls-master 6389 1
|
||||
sentinel auth-pass mymaster public
|
||||
sentinel down-after-milliseconds mymaster 10000
|
||||
sentinel failover-timeout mymaster 20000
|
||||
sentinel monitor mytlsmaster redis-sentinel-tls-master 6389 1
|
||||
sentinel auth-pass mytlsmaster public
|
||||
sentinel down-after-milliseconds mytlsmaster 10000
|
||||
sentinel failover-timeout mytlsmaster 20000
|
||||
|
|
|
@ -64,12 +64,8 @@ refs(_) ->
|
|||
expected => "single | cluster | sentinel"
|
||||
}).
|
||||
|
||||
fields(redis_single) ->
|
||||
common_fields() ++ emqx_redis:fields(single);
|
||||
fields(redis_cluster) ->
|
||||
common_fields() ++ emqx_redis:fields(cluster);
|
||||
fields(redis_sentinel) ->
|
||||
common_fields() ++ emqx_redis:fields(sentinel).
|
||||
fields(Type) ->
|
||||
common_fields() ++ emqx_redis:fields(Type).
|
||||
|
||||
desc(redis_single) ->
|
||||
?DESC(single);
|
||||
|
|
|
@ -34,17 +34,9 @@ namespace() -> "authz".
|
|||
|
||||
type() -> ?AUTHZ_TYPE.
|
||||
|
||||
fields(redis_single) ->
|
||||
fields(Type) ->
|
||||
emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
|
||||
emqx_redis:fields(single) ++
|
||||
[{cmd, cmd()}];
|
||||
fields(redis_sentinel) ->
|
||||
emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
|
||||
emqx_redis:fields(sentinel) ++
|
||||
[{cmd, cmd()}];
|
||||
fields(redis_cluster) ->
|
||||
emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
|
||||
emqx_redis:fields(cluster) ++
|
||||
emqx_redis:fields(Type) ++
|
||||
[{cmd, cmd()}].
|
||||
|
||||
desc(redis_single) ->
|
||||
|
|
|
@ -81,7 +81,8 @@ hard_coded_action_info_modules_ee() ->
|
|||
emqx_bridge_mongodb_action_info,
|
||||
emqx_bridge_pgsql_action_info,
|
||||
emqx_bridge_syskeeper_action_info,
|
||||
emqx_bridge_timescale_action_info
|
||||
emqx_bridge_timescale_action_info,
|
||||
emqx_bridge_redis_action_info
|
||||
].
|
||||
-else.
|
||||
hard_coded_action_info_modules_ee() ->
|
||||
|
|
|
@ -242,17 +242,17 @@ schema_homogeneous_test() ->
|
|||
is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
|
||||
Fields = Module:fields(TypeName),
|
||||
ExpectedFieldNames = lists:map(fun binary_to_atom/1, top_level_common_action_keys()),
|
||||
MissingFileds = lists:filter(
|
||||
MissingFields = lists:filter(
|
||||
fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames
|
||||
),
|
||||
case MissingFileds of
|
||||
case MissingFields of
|
||||
[] ->
|
||||
false;
|
||||
_ ->
|
||||
{true, #{
|
||||
schema_module => Module,
|
||||
type_name => TypeName,
|
||||
missing_fields => MissingFileds
|
||||
missing_fields => MissingFields
|
||||
}}
|
||||
end.
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_bridge_redis, [
|
||||
{description, "EMQX Enterprise Redis Bridge"},
|
||||
{vsn, "0.1.3"},
|
||||
{vsn, "0.1.4"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
|
@ -9,7 +9,9 @@
|
|||
emqx_resource,
|
||||
emqx_redis
|
||||
]},
|
||||
{env, []},
|
||||
{env, [
|
||||
{emqx_action_info_modules, [emqx_bridge_redis_action_info]}
|
||||
]},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
]}.
|
||||
|
|
|
@ -8,9 +8,9 @@
|
|||
|
||||
-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
|
||||
|
||||
-export([
|
||||
conn_bridge_examples/1
|
||||
]).
|
||||
-export([conn_bridge_examples/1]).
|
||||
|
||||
-export([type_name_fields/1, connector_fields/1]).
|
||||
|
||||
-export([
|
||||
namespace/0,
|
||||
|
@ -100,6 +100,8 @@ namespace() -> "bridge_redis".
|
|||
|
||||
roots() -> [].
|
||||
|
||||
fields(action_parameters) ->
|
||||
[{command_template, fun command_template/1}];
|
||||
fields("post_single") ->
|
||||
method_fields(post, redis_single);
|
||||
fields("post_sentinel") ->
|
||||
|
@ -142,21 +144,13 @@ method_fields(put, ConnectorType) ->
|
|||
redis_bridge_common_fields(Type) ->
|
||||
emqx_bridge_schema:common_bridge_fields() ++
|
||||
[
|
||||
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
|
||||
{command_template, fun command_template/1}
|
||||
{local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})}
|
||||
| fields(action_parameters)
|
||||
] ++
|
||||
resource_fields(Type).
|
||||
|
||||
connector_fields(Type) ->
|
||||
RedisType = bridge_type_to_redis_conn_type(Type),
|
||||
emqx_redis:fields(RedisType).
|
||||
|
||||
bridge_type_to_redis_conn_type(redis_single) ->
|
||||
single;
|
||||
bridge_type_to_redis_conn_type(redis_sentinel) ->
|
||||
sentinel;
|
||||
bridge_type_to_redis_conn_type(redis_cluster) ->
|
||||
cluster.
|
||||
emqx_redis:fields(Type).
|
||||
|
||||
type_name_fields(Type) ->
|
||||
[
|
||||
|
@ -168,7 +162,7 @@ resource_fields(Type) ->
|
|||
[
|
||||
{resource_opts,
|
||||
mk(
|
||||
ref("creation_opts_" ++ atom_to_list(Type)),
|
||||
?R_REF("creation_opts_" ++ atom_to_list(Type)),
|
||||
#{
|
||||
required => false,
|
||||
default => #{},
|
||||
|
@ -185,6 +179,8 @@ resource_creation_fields("redis_cluster") ->
|
|||
resource_creation_fields(_) ->
|
||||
emqx_resource_schema:fields("creation_opts").
|
||||
|
||||
desc(action_parameters) ->
|
||||
?DESC("desc_action_parameters");
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_redis_action_info).
|
||||
|
||||
-behaviour(emqx_action_info).
|
||||
|
||||
-export([
|
||||
bridge_v1_type_name/0,
|
||||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0,
|
||||
bridge_v1_config_to_action_config/2,
|
||||
connector_action_config_to_bridge_v1_config/2,
|
||||
bridge_v1_config_to_connector_config/1,
|
||||
bridge_v1_type_name_fun/1
|
||||
]).
|
||||
|
||||
-import(emqx_utils_conv, [bin/1]).
|
||||
|
||||
-define(SCHEMA_MODULE, emqx_bridge_redis_schema).
|
||||
-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
|
||||
|
||||
action_type_name() -> redis.
|
||||
|
||||
connector_type_name() -> redis.
|
||||
|
||||
schema_module() -> ?SCHEMA_MODULE.
|
||||
|
||||
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
||||
fix_v1_type(
|
||||
maps:merge(
|
||||
maps:without(
|
||||
[<<"connector">>],
|
||||
map_unindent(<<"parameters">>, ActionConfig)
|
||||
),
|
||||
map_unindent(<<"parameters">>, ConnectorConfig)
|
||||
)
|
||||
).
|
||||
|
||||
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
|
||||
ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),
|
||||
ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),
|
||||
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
|
||||
ActionConfig#{<<"connector">> => ConnectorName}.
|
||||
|
||||
bridge_v1_config_to_connector_config(BridgeV1Config) ->
|
||||
ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),
|
||||
ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),
|
||||
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||
ConnectorTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields("config_connector")),
|
||||
%% Need put redis_type into parameter.
|
||||
%% cluster need type to filter resource_opts
|
||||
ConnectorKeys =
|
||||
(maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys)) ++
|
||||
[<<"redis_type">>],
|
||||
ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys,
|
||||
make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config).
|
||||
|
||||
%%------------------------------------------------------------------------------------------
|
||||
%% Internal helper fns
|
||||
%%------------------------------------------------------------------------------------------
|
||||
|
||||
bridge_v1_type_name() ->
|
||||
{fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}.
|
||||
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"redis_type">> := Type}}, _}) ->
|
||||
v1_type(Type).
|
||||
|
||||
fix_v1_type(#{<<"redis_type">> := RedisType} = Conf) ->
|
||||
Conf#{<<"type">> => v1_type(RedisType)}.
|
||||
|
||||
v1_type(<<"single">>) -> redis_single;
|
||||
v1_type(<<"sentinel">>) -> redis_sentinel;
|
||||
v1_type(<<"cluster">>) -> redis_cluster.
|
||||
|
||||
bridge_v1_type_names() -> [redis_single, redis_sentinel, redis_cluster].
|
||||
|
||||
map_unindent(Key, Map) ->
|
||||
maps:merge(
|
||||
maps:get(Key, Map),
|
||||
maps:remove(Key, Map)
|
||||
).
|
||||
|
||||
map_indent(IndentKey, PickKeys, Map) ->
|
||||
maps:put(
|
||||
IndentKey,
|
||||
maps:with(PickKeys, Map),
|
||||
maps:without(PickKeys, Map)
|
||||
).
|
||||
|
||||
schema_keys(Schema) ->
|
||||
[bin(Key) || {Key, _} <- Schema].
|
||||
|
||||
make_config_map(PickKeys, IndentKeys, Config) ->
|
||||
Conf0 = maps:with(PickKeys, Config),
|
||||
map_indent(<<"parameters">>, IndentKeys, Conf0).
|
|
@ -4,6 +4,7 @@
|
|||
-module(emqx_bridge_redis_connector).
|
||||
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-behaviour(emqx_resource).
|
||||
|
@ -11,11 +12,15 @@
|
|||
%% callbacks of behaviour emqx_resource
|
||||
-export([
|
||||
callback_mode/0,
|
||||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_start/2,
|
||||
on_stop/2,
|
||||
on_query/3,
|
||||
on_batch_query/3,
|
||||
on_get_status/2
|
||||
on_get_status/2,
|
||||
on_get_channel_status/3
|
||||
]).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
|
@ -24,7 +29,34 @@
|
|||
|
||||
callback_mode() -> always_sync.
|
||||
|
||||
on_start(InstId, #{command_template := CommandTemplate} = Config) ->
|
||||
on_add_channel(
|
||||
_InstanceId,
|
||||
State = #{channels := Channels},
|
||||
ChannelId,
|
||||
#{
|
||||
parameters := #{
|
||||
command_template := Template
|
||||
}
|
||||
}
|
||||
) ->
|
||||
Channels2 = Channels#{
|
||||
ChannelId => #{template => preproc_command_template(Template)}
|
||||
},
|
||||
{ok, State#{channels => Channels2}}.
|
||||
|
||||
on_remove_channel(_InstanceId, State = #{channels := Channels}, ChannelId) ->
|
||||
{ok, State#{channels => maps:remove(ChannelId, Channels)}}.
|
||||
|
||||
on_get_channels(InstanceId) ->
|
||||
emqx_bridge_v2:get_channels_for_connector(InstanceId).
|
||||
|
||||
on_get_channel_status(_ConnectorResId, ChannelId, #{channels := Channels}) ->
|
||||
case maps:is_key(ChannelId, Channels) of
|
||||
true -> ?status_connected;
|
||||
false -> ?status_disconnected
|
||||
end.
|
||||
|
||||
on_start(InstId, Config) ->
|
||||
case emqx_redis:on_start(InstId, Config) of
|
||||
{ok, RedisConnSt} ->
|
||||
?tp(
|
||||
|
@ -33,7 +65,7 @@ on_start(InstId, #{command_template := CommandTemplate} = Config) ->
|
|||
),
|
||||
{ok, #{
|
||||
conn_st => RedisConnSt,
|
||||
command_template => preproc_command_template(CommandTemplate)
|
||||
channels => #{}
|
||||
}};
|
||||
{error, {start_pool_failed, _, #{type := authentication_error, reason := Reason}}} = Error ->
|
||||
?tp(
|
||||
|
@ -57,14 +89,8 @@ on_stop(InstId, undefined = _State) ->
|
|||
on_get_status(InstId, #{conn_st := RedisConnSt}) ->
|
||||
emqx_redis:on_get_status(InstId, RedisConnSt).
|
||||
|
||||
on_query(
|
||||
InstId,
|
||||
{send_message, Data},
|
||||
_State = #{
|
||||
command_template := CommandTemplate, conn_st := RedisConnSt
|
||||
}
|
||||
) ->
|
||||
Cmd = proc_command_template(CommandTemplate, Data),
|
||||
%% raw cmd without template, for CI test
|
||||
on_query(InstId, {cmd, Cmd}, #{conn_st := RedisConnSt}) ->
|
||||
?tp(
|
||||
redis_bridge_connector_cmd,
|
||||
#{cmd => Cmd, batch => false, mode => sync}
|
||||
|
@ -77,45 +103,68 @@ on_query(
|
|||
Result;
|
||||
on_query(
|
||||
InstId,
|
||||
Query,
|
||||
_State = #{conn_st := RedisConnSt}
|
||||
{_MessageTag, _Data} = Msg,
|
||||
#{channels := Channels, conn_st := RedisConnSt}
|
||||
) ->
|
||||
?tp(
|
||||
redis_bridge_connector_query,
|
||||
#{query => Query, batch => false, mode => sync}
|
||||
),
|
||||
Result = query(InstId, Query, RedisConnSt),
|
||||
?tp(
|
||||
redis_bridge_connector_send_done,
|
||||
#{query => Query, batch => false, mode => sync, result => Result}
|
||||
),
|
||||
Result.
|
||||
case try_render_message([Msg], Channels) of
|
||||
{ok, [Cmd]} ->
|
||||
?tp(
|
||||
redis_bridge_connector_cmd,
|
||||
#{cmd => Cmd, batch => false, mode => sync}
|
||||
),
|
||||
Result = query(InstId, {cmd, Cmd}, RedisConnSt),
|
||||
?tp(
|
||||
redis_bridge_connector_send_done,
|
||||
#{cmd => Cmd, batch => false, mode => sync, result => Result}
|
||||
),
|
||||
Result;
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
on_batch_query(
|
||||
InstId, BatchData, _State = #{command_template := CommandTemplate, conn_st := RedisConnSt}
|
||||
InstId, BatchData, _State = #{channels := Channels, conn_st := RedisConnSt}
|
||||
) ->
|
||||
Cmds = process_batch_data(BatchData, CommandTemplate),
|
||||
?tp(
|
||||
redis_bridge_connector_send,
|
||||
#{batch_data => BatchData, batch => true, mode => sync}
|
||||
),
|
||||
Result = query(InstId, {cmds, Cmds}, RedisConnSt),
|
||||
?tp(
|
||||
redis_bridge_connector_send_done,
|
||||
#{
|
||||
batch_data => BatchData,
|
||||
batch_size => length(BatchData),
|
||||
batch => true,
|
||||
mode => sync,
|
||||
result => Result
|
||||
}
|
||||
),
|
||||
Result.
|
||||
case try_render_message(BatchData, Channels) of
|
||||
{ok, Cmds} ->
|
||||
?tp(
|
||||
redis_bridge_connector_send,
|
||||
#{batch_data => BatchData, batch => true, mode => sync}
|
||||
),
|
||||
Result = query(InstId, {cmds, Cmds}, RedisConnSt),
|
||||
?tp(
|
||||
redis_bridge_connector_send_done,
|
||||
#{
|
||||
batch_data => BatchData,
|
||||
batch_size => length(BatchData),
|
||||
batch => true,
|
||||
mode => sync,
|
||||
result => Result
|
||||
}
|
||||
),
|
||||
Result;
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% private helpers
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
|
||||
try_render_message(Datas, Channels) ->
|
||||
try_render_message(Datas, Channels, []).
|
||||
|
||||
try_render_message([{MessageTag, Data} | T], Channels, Acc) ->
|
||||
case maps:find(MessageTag, Channels) of
|
||||
{ok, #{template := Template}} ->
|
||||
Msg = proc_command_template(Template, Data),
|
||||
try_render_message(T, Channels, [Msg | Acc]);
|
||||
_ ->
|
||||
{error, {unrecoverable_error, {invalid_message_tag, MessageTag}}}
|
||||
end;
|
||||
try_render_message([], _Channels, Acc) ->
|
||||
{ok, lists:reverse(Acc)}.
|
||||
|
||||
query(InstId, Query, RedisConnSt) ->
|
||||
case emqx_redis:on_query(InstId, Query, RedisConnSt) of
|
||||
{ok, _} = Ok -> Ok;
|
||||
|
@ -123,14 +172,6 @@ query(InstId, Query, RedisConnSt) ->
|
|||
{error, _} = Error -> Error
|
||||
end.
|
||||
|
||||
process_batch_data(BatchData, CommandTemplate) ->
|
||||
lists:map(
|
||||
fun({send_message, Data}) ->
|
||||
proc_command_template(CommandTemplate, Data)
|
||||
end,
|
||||
BatchData
|
||||
).
|
||||
|
||||
proc_command_template(CommandTemplate, Msg) ->
|
||||
lists:map(
|
||||
fun(ArgTks) ->
|
||||
|
|
|
@ -0,0 +1,276 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_redis_schema).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-define(TYPE, redis).
|
||||
|
||||
%% `hocon_schema' API
|
||||
-export([
|
||||
namespace/0,
|
||||
roots/0,
|
||||
fields/1,
|
||||
desc/1,
|
||||
resource_opts_converter/2
|
||||
]).
|
||||
|
||||
%% `emqx_bridge_v2_schema' "unofficial" API
|
||||
-export([
|
||||
bridge_v2_examples/1,
|
||||
conn_bridge_examples/1,
|
||||
connector_examples/1
|
||||
]).
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% `hocon_schema' API
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
namespace() ->
|
||||
?TYPE.
|
||||
|
||||
roots() ->
|
||||
[].
|
||||
|
||||
%%=========================================
|
||||
%% Action fields
|
||||
%%=========================================
|
||||
fields("config_connector") ->
|
||||
emqx_connector_schema:common_fields() ++
|
||||
[
|
||||
{parameters,
|
||||
?HOCON(
|
||||
hoconsc:union([
|
||||
?R_REF(emqx_redis, redis_single_connector),
|
||||
?R_REF(emqx_redis, redis_sentinel_connector),
|
||||
?R_REF(emqx_redis, redis_cluster_connector)
|
||||
]),
|
||||
#{required => true, desc => ?DESC(redis_parameters)}
|
||||
)}
|
||||
] ++
|
||||
emqx_redis:redis_fields() ++
|
||||
emqx_connector_schema_lib:ssl_fields();
|
||||
fields(action) ->
|
||||
{?TYPE,
|
||||
?HOCON(
|
||||
?MAP(name, ?R_REF(redis_action)),
|
||||
#{
|
||||
desc => <<"Redis Action Config">>,
|
||||
converter => fun ?MODULE:resource_opts_converter/2,
|
||||
required => false
|
||||
}
|
||||
)};
|
||||
fields(redis_action) ->
|
||||
Schema =
|
||||
emqx_bridge_v2_schema:make_producer_action_schema(
|
||||
?HOCON(
|
||||
?R_REF(emqx_bridge_redis, action_parameters),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(producer_action)
|
||||
}
|
||||
)
|
||||
),
|
||||
ResOpts =
|
||||
{resource_opts,
|
||||
?HOCON(
|
||||
?R_REF(resource_opts),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(emqx_resource_schema, resource_opts)
|
||||
}
|
||||
)},
|
||||
RedisType =
|
||||
{redis_type,
|
||||
?HOCON(
|
||||
?ENUM([single, sentinel, cluster]),
|
||||
#{required => true, desc => ?DESC(redis_type)}
|
||||
)},
|
||||
[RedisType | lists:keyreplace(resource_opts, 1, Schema, ResOpts)];
|
||||
fields(resource_opts) ->
|
||||
emqx_resource_schema:create_opts([
|
||||
{batch_size, #{desc => ?DESC(batch_size)}},
|
||||
{batch_time, #{desc => ?DESC(batch_time)}}
|
||||
]);
|
||||
%%=========================================
|
||||
%% HTTP API fields
|
||||
%%=========================================
|
||||
fields("post_connector") ->
|
||||
emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("config_connector");
|
||||
fields("put_connector") ->
|
||||
fields("config_connector");
|
||||
fields("get_connector") ->
|
||||
emqx_bridge_schema:status_fields() ++
|
||||
fields("post_connector");
|
||||
fields("get_bridge_v2") ->
|
||||
emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2");
|
||||
fields("post_bridge_v2") ->
|
||||
emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("put_bridge_v2");
|
||||
fields("put_bridge_v2") ->
|
||||
fields(redis_action);
|
||||
fields("get_single") ->
|
||||
emqx_bridge_schema:status_fields() ++ fields("put_single");
|
||||
fields("put_single") ->
|
||||
fields("config_connector");
|
||||
fields("post_single") ->
|
||||
emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("put_single").
|
||||
|
||||
desc("config_connector") ->
|
||||
?DESC(emqx_bridge_redis, "desc_config");
|
||||
desc(redis_action) ->
|
||||
?DESC(redis_action);
|
||||
desc(resource_opts) ->
|
||||
?DESC(emqx_resource_schema, resource_opts);
|
||||
desc(_Name) ->
|
||||
undefined.
|
||||
|
||||
resource_opts_converter(undefined, _Opts) ->
|
||||
undefined;
|
||||
resource_opts_converter(Conf, _Opts) ->
|
||||
maps:map(
|
||||
fun(_Name, SubConf) ->
|
||||
case SubConf of
|
||||
#{<<"redis_type">> := <<"cluster">>} ->
|
||||
ResOpts = maps:get(<<"resource_opts">>, SubConf, #{}),
|
||||
%% cluster don't support batch
|
||||
SubConf#{
|
||||
<<"resource_opts">> =>
|
||||
ResOpts#{<<"batch_size">> => 1, <<"batch_time">> => <<"0ms">>}
|
||||
};
|
||||
_ ->
|
||||
SubConf
|
||||
end
|
||||
end,
|
||||
Conf
|
||||
).
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% `emqx_bridge_v2_schema' "unofficial" API
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
bridge_v2_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"redis_single_producer">> => #{
|
||||
summary => <<"Redis Single Producer Action">>,
|
||||
value => action_example(single, Method)
|
||||
}
|
||||
},
|
||||
#{
|
||||
<<"redis_sentinel_producer">> => #{
|
||||
summary => <<"Redis Sentinel Producer Action">>,
|
||||
value => action_example(sentinel, Method)
|
||||
}
|
||||
},
|
||||
#{
|
||||
<<"redis_cluster_producer">> => #{
|
||||
summary => <<"Redis Cluster Producer Action">>,
|
||||
value => action_example(cluster, Method)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
connector_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"redis_single_producer">> => #{
|
||||
summary => <<"Redis Single Producer Connector">>,
|
||||
value => connector_example(single, Method)
|
||||
}
|
||||
},
|
||||
#{
|
||||
<<"redis_cluster_producer">> => #{
|
||||
summary => <<"Redis Cluster Producer Connector">>,
|
||||
value => connector_example(cluster, Method)
|
||||
}
|
||||
},
|
||||
#{
|
||||
<<"redis_sentinel_producer">> => #{
|
||||
summary => <<"Redis Sentinel Producer Connector">>,
|
||||
value => connector_example(sentinel, Method)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
conn_bridge_examples(Method) ->
|
||||
emqx_bridge_redis:conn_bridge_examples(Method).
|
||||
|
||||
action_example(RedisType, post) ->
|
||||
maps:merge(
|
||||
action_example(RedisType, put),
|
||||
#{
|
||||
type => <<"redis">>,
|
||||
name => <<"my_action">>
|
||||
}
|
||||
);
|
||||
action_example(RedisType, get) ->
|
||||
maps:merge(
|
||||
action_example(RedisType, put),
|
||||
#{
|
||||
status => <<"connected">>,
|
||||
node_status => [
|
||||
#{
|
||||
node => <<"emqx@localhost">>,
|
||||
status => <<"connected">>
|
||||
}
|
||||
]
|
||||
}
|
||||
);
|
||||
action_example(RedisType, put) ->
|
||||
#{
|
||||
redis_type => RedisType,
|
||||
enable => true,
|
||||
connector => <<"my_connector_name">>,
|
||||
description => <<"My action">>,
|
||||
parameters => #{
|
||||
command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>]
|
||||
},
|
||||
resource_opts => #{batch_size => 1}
|
||||
}.
|
||||
|
||||
connector_example(RedisType, get) ->
|
||||
maps:merge(
|
||||
connector_example(RedisType, put),
|
||||
#{
|
||||
status => <<"connected">>,
|
||||
node_status => [
|
||||
#{
|
||||
node => <<"emqx@localhost">>,
|
||||
status => <<"connected">>
|
||||
}
|
||||
]
|
||||
}
|
||||
);
|
||||
connector_example(RedisType, post) ->
|
||||
maps:merge(
|
||||
connector_example(RedisType, put),
|
||||
#{
|
||||
type => <<"redis_single_producer">>,
|
||||
name => <<"my_connector">>
|
||||
}
|
||||
);
|
||||
connector_example(RedisType, put) ->
|
||||
#{
|
||||
enable => true,
|
||||
desc => <<"My redis ", (atom_to_binary(RedisType))/binary, " connector">>,
|
||||
parameters => connector_parameter(RedisType),
|
||||
pool_size => 8,
|
||||
database => 1,
|
||||
username => <<"test">>,
|
||||
password => <<"******">>,
|
||||
auto_reconnect => true,
|
||||
ssl => #{enable => false}
|
||||
}.
|
||||
|
||||
connector_parameter(single) ->
|
||||
#{redis_type => single, server => <<"127.0.0.1:6379">>};
|
||||
connector_parameter(cluster) ->
|
||||
#{redis_type => cluster, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>};
|
||||
connector_parameter(sentinel) ->
|
||||
#{
|
||||
redis_type => sentinel,
|
||||
servers => <<"127.0.0.1:6379,127.0.0.2:6379">>,
|
||||
sentinel => <<"myredismaster">>
|
||||
}.
|
|
@ -56,6 +56,7 @@
|
|||
).
|
||||
|
||||
all() -> [{group, transports}, {group, rest}].
|
||||
suite() -> [{timetrap, {minutes, 20}}].
|
||||
|
||||
groups() ->
|
||||
ResourceSpecificTCs = [t_create_delete_bridge],
|
||||
|
@ -143,15 +144,19 @@ redis_checks() ->
|
|||
end.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok = delete_all_bridges(),
|
||||
ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(),
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_resource]),
|
||||
_ = application:stop(emqx_connector),
|
||||
ok.
|
||||
|
||||
init_per_testcase(_Testcase, Config) ->
|
||||
init_per_testcase(Testcase, Config0) ->
|
||||
emqx_logger:set_log_level(debug),
|
||||
ok = delete_all_rules(),
|
||||
ok = delete_all_bridges(),
|
||||
ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(),
|
||||
UniqueNum = integer_to_binary(erlang:unique_integer()),
|
||||
Name = <<(atom_to_binary(Testcase))/binary, UniqueNum/binary>>,
|
||||
Config = [{bridge_name, Name} | Config0],
|
||||
case {?config(connector_type, Config), ?config(batch_mode, Config)} of
|
||||
{undefined, _} ->
|
||||
Config;
|
||||
|
@ -165,7 +170,13 @@ init_per_testcase(_Testcase, Config) ->
|
|||
IsBatch = (BatchMode =:= batch_on),
|
||||
BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS),
|
||||
BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig},
|
||||
[{bridge_config, BridgeConfig1}, {is_batch, IsBatch} | Config]
|
||||
BridgeType = list_to_atom(atom_to_list(RedisType) ++ "_producer"),
|
||||
[
|
||||
{bridge_type, BridgeType},
|
||||
{bridge_config, BridgeConfig1},
|
||||
{is_batch, IsBatch}
|
||||
| Config
|
||||
]
|
||||
end.
|
||||
|
||||
end_per_testcase(_Testcase, Config) ->
|
||||
|
@ -173,10 +184,18 @@ end_per_testcase(_Testcase, Config) ->
|
|||
ProxyPort = ?config(proxy_port, Config),
|
||||
ok = snabbkaffe:stop(),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
ok = delete_all_bridges().
|
||||
ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors().
|
||||
|
||||
t_create_delete_bridge(Config) ->
|
||||
Name = <<"mybridge">>,
|
||||
Pid = erlang:whereis(eredis_sentinel),
|
||||
ct:pal("t_create_detele_bridge:~p~n", [
|
||||
#{
|
||||
config => Config,
|
||||
sentinel => Pid,
|
||||
eredis_sentinel => Pid =/= undefined andalso erlang:process_info(Pid)
|
||||
}
|
||||
]),
|
||||
Name = ?config(bridge_name, Config),
|
||||
Type = ?config(connector_type, Config),
|
||||
BridgeConfig = ?config(bridge_config, Config),
|
||||
IsBatch = ?config(is_batch, Config),
|
||||
|
@ -184,13 +203,11 @@ t_create_delete_bridge(Config) ->
|
|||
{ok, _},
|
||||
emqx_bridge:create(Type, Name, BridgeConfig)
|
||||
),
|
||||
|
||||
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
|
||||
|
||||
?WAIT(
|
||||
{ok, connected},
|
||||
emqx_resource:health_check(ResourceId),
|
||||
5
|
||||
10
|
||||
),
|
||||
|
||||
RedisType = atom_to_binary(Type),
|
||||
|
@ -244,7 +261,7 @@ t_check_values(_Config) ->
|
|||
).
|
||||
|
||||
t_check_replay(Config) ->
|
||||
Name = <<"toxic_bridge">>,
|
||||
Name = ?config(bridge_name, Config),
|
||||
Type = <<"redis_single">>,
|
||||
Topic = <<"local_topic/test">>,
|
||||
ProxyName = "redis_single_tcp",
|
||||
|
@ -324,15 +341,15 @@ t_permanent_error(_Config) ->
|
|||
),
|
||||
ok = emqx_bridge:remove(Type, Name).
|
||||
|
||||
t_auth_username_password(_Config) ->
|
||||
Name = <<"mybridge">>,
|
||||
t_auth_username_password(Config) ->
|
||||
Name = ?config(bridge_name, Config),
|
||||
Type = <<"redis_single">>,
|
||||
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
|
||||
BridgeConfig = username_password_redis_bridge_config(),
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_bridge:create(Type, Name, BridgeConfig)
|
||||
),
|
||||
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
|
||||
?WAIT(
|
||||
{ok, connected},
|
||||
emqx_resource:health_check(ResourceId),
|
||||
|
@ -340,16 +357,16 @@ t_auth_username_password(_Config) ->
|
|||
),
|
||||
ok = emqx_bridge:remove(Type, Name).
|
||||
|
||||
t_auth_error_username_password(_Config) ->
|
||||
Name = <<"mybridge">>,
|
||||
t_auth_error_username_password(Config) ->
|
||||
Name = ?config(bridge_name, Config),
|
||||
Type = <<"redis_single">>,
|
||||
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
|
||||
BridgeConfig0 = username_password_redis_bridge_config(),
|
||||
BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}),
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_bridge:create(Type, Name, BridgeConfig)
|
||||
),
|
||||
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
|
||||
?WAIT(
|
||||
{ok, disconnected},
|
||||
emqx_resource:health_check(ResourceId),
|
||||
|
@ -361,16 +378,16 @@ t_auth_error_username_password(_Config) ->
|
|||
),
|
||||
ok = emqx_bridge:remove(Type, Name).
|
||||
|
||||
t_auth_error_password_only(_Config) ->
|
||||
Name = <<"mybridge">>,
|
||||
t_auth_error_password_only(Config) ->
|
||||
Name = ?config(bridge_name, Config),
|
||||
Type = <<"redis_single">>,
|
||||
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
|
||||
BridgeConfig0 = toxiproxy_redis_bridge_config(),
|
||||
BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}),
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_bridge:create(Type, Name, BridgeConfig)
|
||||
),
|
||||
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
|
||||
?assertEqual(
|
||||
{ok, disconnected},
|
||||
emqx_resource:health_check(ResourceId)
|
||||
|
@ -382,7 +399,7 @@ t_auth_error_password_only(_Config) ->
|
|||
ok = emqx_bridge:remove(Type, Name).
|
||||
|
||||
t_create_disconnected(Config) ->
|
||||
Name = <<"toxic_bridge">>,
|
||||
Name = ?config(bridge_name, Config),
|
||||
Type = <<"redis_single">>,
|
||||
|
||||
?check_trace(
|
||||
|
@ -450,10 +467,8 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
|
|||
added_msgs(ResourceId, BaseTopic, Payload) ->
|
||||
lists:flatmap(
|
||||
fun(K) ->
|
||||
{ok, Results} = emqx_resource:simple_sync_query(
|
||||
ResourceId,
|
||||
{cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]}
|
||||
),
|
||||
Message = {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]},
|
||||
{ok, Results} = emqx_resource:simple_sync_query(ResourceId, Message),
|
||||
[El || El <- Results, El =:= Payload]
|
||||
end,
|
||||
[format_redis_key(BaseTopic, S) || S <- lists:seq(0, ?KEYSHARDS - 1)]
|
||||
|
@ -482,14 +497,6 @@ delete_all_rules() ->
|
|||
emqx_rule_engine:get_rules()
|
||||
).
|
||||
|
||||
delete_all_bridges() ->
|
||||
lists:foreach(
|
||||
fun(#{name := Name, type := Type}) ->
|
||||
emqx_bridge:remove(Type, Name)
|
||||
end,
|
||||
emqx_bridge:list()
|
||||
).
|
||||
|
||||
all_test_hosts() ->
|
||||
Confs = [
|
||||
?REDIS_TOXYPROXY_CONNECT_CONFIG
|
||||
|
@ -554,12 +561,12 @@ redis_connect_configs() ->
|
|||
tcp => #{
|
||||
<<"servers">> => <<"redis-sentinel:26379">>,
|
||||
<<"redis_type">> => <<"sentinel">>,
|
||||
<<"sentinel">> => <<"mymaster">>
|
||||
<<"sentinel">> => <<"mytcpmaster">>
|
||||
},
|
||||
tls => #{
|
||||
<<"servers">> => <<"redis-sentinel-tls:26380">>,
|
||||
<<"redis_type">> => <<"sentinel">>,
|
||||
<<"sentinel">> => <<"mymaster">>,
|
||||
<<"sentinel">> => <<"mytlsmaster">>,
|
||||
<<"ssl">> => redis_connect_ssl_opts(redis_sentinel)
|
||||
}
|
||||
},
|
||||
|
|
|
@ -9,6 +9,9 @@
|
|||
stdlib,
|
||||
ecpool,
|
||||
emqx_resource,
|
||||
eredis,
|
||||
%% eredis_cluster has supervisor should be started before emqx_connector
|
||||
%% otherwise the first start redis_cluster will fail.
|
||||
eredis_cluster,
|
||||
ehttpc,
|
||||
jose,
|
||||
|
|
|
@ -41,6 +41,8 @@ resource_type(syskeeper_proxy) ->
|
|||
emqx_bridge_syskeeper_proxy_server;
|
||||
resource_type(timescale) ->
|
||||
emqx_postgresql;
|
||||
resource_type(redis) ->
|
||||
emqx_bridge_redis_connector;
|
||||
resource_type(Type) ->
|
||||
error({unknown_connector_type, Type}).
|
||||
|
||||
|
@ -138,6 +140,14 @@ connector_structs() ->
|
|||
desc => <<"Matrix Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{redis,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")),
|
||||
#{
|
||||
desc => <<"Redis Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)}
|
||||
].
|
||||
|
||||
|
@ -164,7 +174,8 @@ schema_modules() ->
|
|||
emqx_bridge_syskeeper_connector,
|
||||
emqx_bridge_syskeeper_proxy,
|
||||
emqx_bridge_timescale,
|
||||
emqx_postgresql_connector_schema
|
||||
emqx_postgresql_connector_schema,
|
||||
emqx_bridge_redis_schema
|
||||
].
|
||||
|
||||
api_schemas(Method) ->
|
||||
|
@ -188,7 +199,8 @@ api_schemas(Method) ->
|
|||
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
|
||||
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
|
||||
api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
|
||||
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector")
|
||||
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector")
|
||||
].
|
||||
|
||||
api_ref(Module, Type, Method) ->
|
||||
|
|
|
@ -71,16 +71,28 @@ enterprise_fields_connectors() -> [].
|
|||
|
||||
-endif.
|
||||
|
||||
connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer];
|
||||
connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
|
||||
connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer];
|
||||
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
|
||||
connector_type_to_bridge_types(matrix) -> [matrix];
|
||||
connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
|
||||
connector_type_to_bridge_types(pgsql) -> [pgsql];
|
||||
connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder];
|
||||
connector_type_to_bridge_types(syskeeper_proxy) -> [];
|
||||
connector_type_to_bridge_types(timescale) -> [timescale].
|
||||
connector_type_to_bridge_types(azure_event_hub_producer) ->
|
||||
[azure_event_hub_producer];
|
||||
connector_type_to_bridge_types(confluent_producer) ->
|
||||
[confluent_producer];
|
||||
connector_type_to_bridge_types(gcp_pubsub_producer) ->
|
||||
[gcp_pubsub, gcp_pubsub_producer];
|
||||
connector_type_to_bridge_types(kafka_producer) ->
|
||||
[kafka, kafka_producer];
|
||||
connector_type_to_bridge_types(matrix) ->
|
||||
[matrix];
|
||||
connector_type_to_bridge_types(mongodb) ->
|
||||
[mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
|
||||
connector_type_to_bridge_types(pgsql) ->
|
||||
[pgsql];
|
||||
connector_type_to_bridge_types(syskeeper_forwarder) ->
|
||||
[syskeeper_forwarder];
|
||||
connector_type_to_bridge_types(syskeeper_proxy) ->
|
||||
[];
|
||||
connector_type_to_bridge_types(timescale) ->
|
||||
[timescale];
|
||||
connector_type_to_bridge_types(redis) ->
|
||||
[redis, redis_single, redis_sentinel, redis_cluster].
|
||||
|
||||
actions_config_name() -> <<"actions">>.
|
||||
|
||||
|
@ -125,7 +137,7 @@ split_bridge_to_connector_and_action(
|
|||
BridgeType, BridgeV1Conf
|
||||
);
|
||||
false ->
|
||||
%% We do an automatic transfomation to get the connector config
|
||||
%% We do an automatic transformation to get the connector config
|
||||
%% if the callback is not defined.
|
||||
%% Get connector fields from bridge config
|
||||
lists:foldl(
|
||||
|
|
|
@ -844,7 +844,7 @@ parse_object_loop(PropList0, Module, Options) ->
|
|||
),
|
||||
parse_object_loop(PropList, Module, Options, _Props = [], _Required = [], _Refs = []).
|
||||
|
||||
parse_object_loop([], _Modlue, _Options, Props, Required, Refs) ->
|
||||
parse_object_loop([], _Module, _Options, Props, Required, Refs) ->
|
||||
{lists:reverse(Props), lists:usort(Required), Refs};
|
||||
parse_object_loop([{Name, Hocon} | Rest], Module, Options, Props, Required, Refs) ->
|
||||
NameBin = to_bin(Name),
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-export([roots/0, fields/1]).
|
||||
-export([roots/0, fields/1, redis_fields/0, desc/1]).
|
||||
|
||||
-behaviour(emqx_resource).
|
||||
|
||||
|
@ -50,55 +50,46 @@ roots() ->
|
|||
{config, #{
|
||||
type => hoconsc:union(
|
||||
[
|
||||
hoconsc:ref(?MODULE, cluster),
|
||||
hoconsc:ref(?MODULE, single),
|
||||
hoconsc:ref(?MODULE, sentinel)
|
||||
?R_REF(redis_cluster),
|
||||
?R_REF(redis_single),
|
||||
?R_REF(redis_sentinel)
|
||||
]
|
||||
)
|
||||
}}
|
||||
].
|
||||
|
||||
fields(single) ->
|
||||
[
|
||||
{server, server()},
|
||||
{redis_type, #{
|
||||
type => single,
|
||||
default => single,
|
||||
required => false,
|
||||
desc => ?DESC("single")
|
||||
}}
|
||||
] ++
|
||||
fields(redis_single) ->
|
||||
fields(redis_single_connector) ++
|
||||
redis_fields() ++
|
||||
emqx_connector_schema_lib:ssl_fields();
|
||||
fields(cluster) ->
|
||||
fields(redis_single_connector) ->
|
||||
[
|
||||
{servers, servers()},
|
||||
{redis_type, #{
|
||||
type => cluster,
|
||||
default => cluster,
|
||||
required => false,
|
||||
desc => ?DESC("cluster")
|
||||
}}
|
||||
] ++
|
||||
{server, server()},
|
||||
redis_type(single)
|
||||
];
|
||||
fields(redis_cluster) ->
|
||||
fields(redis_cluster_connector) ++
|
||||
lists:keydelete(database, 1, redis_fields()) ++
|
||||
emqx_connector_schema_lib:ssl_fields();
|
||||
fields(sentinel) ->
|
||||
fields(redis_cluster_connector) ->
|
||||
[
|
||||
{servers, servers()},
|
||||
{redis_type, #{
|
||||
type => sentinel,
|
||||
default => sentinel,
|
||||
required => false,
|
||||
desc => ?DESC("sentinel")
|
||||
}},
|
||||
redis_type(cluster)
|
||||
];
|
||||
fields(redis_sentinel) ->
|
||||
fields(redis_sentinel_connector) ++
|
||||
redis_fields() ++
|
||||
emqx_connector_schema_lib:ssl_fields();
|
||||
fields(redis_sentinel_connector) ->
|
||||
[
|
||||
{servers, servers()},
|
||||
redis_type(sentinel),
|
||||
{sentinel, #{
|
||||
type => string(),
|
||||
required => true,
|
||||
desc => ?DESC("sentinel_desc")
|
||||
}}
|
||||
] ++
|
||||
redis_fields() ++
|
||||
emqx_connector_schema_lib:ssl_fields().
|
||||
].
|
||||
|
||||
server() ->
|
||||
Meta = #{desc => ?DESC("server")},
|
||||
|
@ -108,64 +99,52 @@ servers() ->
|
|||
Meta = #{desc => ?DESC("servers")},
|
||||
emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS).
|
||||
|
||||
desc(redis_cluster_connector) ->
|
||||
?DESC(redis_cluster_connector);
|
||||
desc(redis_single_connector) ->
|
||||
?DESC(redis_single_connector);
|
||||
desc(redis_sentinel_connector) ->
|
||||
?DESC(redis_sentinel_connector);
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
%% ===================================================================
|
||||
|
||||
redis_type(Type) ->
|
||||
{redis_type, #{
|
||||
type => Type,
|
||||
default => Type,
|
||||
required => false,
|
||||
desc => ?DESC(Type)
|
||||
}}.
|
||||
|
||||
callback_mode() -> always_sync.
|
||||
|
||||
on_start(
|
||||
InstId,
|
||||
#{
|
||||
redis_type := Type,
|
||||
pool_size := PoolSize,
|
||||
ssl := SSL
|
||||
} = Config
|
||||
) ->
|
||||
on_start(InstId, Config0) ->
|
||||
?SLOG(info, #{
|
||||
msg => "starting_redis_connector",
|
||||
connector => InstId,
|
||||
config => emqx_utils:redact(Config)
|
||||
config => emqx_utils:redact(Config0)
|
||||
}),
|
||||
ConfKey =
|
||||
case Type of
|
||||
single -> server;
|
||||
_ -> servers
|
||||
end,
|
||||
Servers0 = maps:get(ConfKey, Config),
|
||||
Servers1 = lists:map(
|
||||
fun(#{hostname := Host, port := Port}) ->
|
||||
{Host, Port}
|
||||
end,
|
||||
emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS)
|
||||
),
|
||||
Servers = [{servers, Servers1}],
|
||||
Database =
|
||||
case Type of
|
||||
cluster -> [];
|
||||
_ -> [{database, maps:get(database, Config)}]
|
||||
end,
|
||||
Config = config(Config0),
|
||||
#{pool_size := PoolSize, ssl := SSL, redis_type := Type} = Config,
|
||||
Options = ssl_options(SSL) ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
|
||||
Opts =
|
||||
[
|
||||
{pool_size, PoolSize},
|
||||
{username, maps:get(username, Config, undefined)},
|
||||
{password, maps:get(password, Config, "")},
|
||||
{servers, servers(Config)},
|
||||
{options, Options},
|
||||
{pool_size, PoolSize},
|
||||
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
|
||||
] ++ Database ++ Servers,
|
||||
Options =
|
||||
case maps:get(enable, SSL) of
|
||||
true ->
|
||||
[
|
||||
{ssl, true},
|
||||
{ssl_options, emqx_tls_lib:to_client_opts(SSL)}
|
||||
];
|
||||
false ->
|
||||
[{ssl, false}]
|
||||
end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
|
||||
] ++ database(Config),
|
||||
|
||||
State = #{pool_name => InstId, type => Type},
|
||||
ok = emqx_resource:allocate_resource(InstId, type, Type),
|
||||
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
|
||||
case Type of
|
||||
cluster ->
|
||||
case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of
|
||||
case eredis_cluster:start_pool(InstId, Opts) of
|
||||
{ok, _} ->
|
||||
{ok, State};
|
||||
{ok, _, _} ->
|
||||
|
@ -174,7 +153,7 @@ on_start(
|
|||
{error, Reason}
|
||||
end;
|
||||
_ ->
|
||||
case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ [{options, Options}]) of
|
||||
case emqx_resource_pool:start(InstId, ?MODULE, Opts) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
|
@ -182,6 +161,14 @@ on_start(
|
|||
end
|
||||
end.
|
||||
|
||||
ssl_options(SSL = #{enable := true}) ->
|
||||
[
|
||||
{ssl, true},
|
||||
{ssl_options, emqx_tls_lib:to_client_opts(SSL)}
|
||||
];
|
||||
ssl_options(#{enable := false}) ->
|
||||
[{ssl, false}].
|
||||
|
||||
on_stop(InstId, _State) ->
|
||||
?SLOG(info, #{
|
||||
msg => "stopping_redis_connector",
|
||||
|
@ -189,7 +176,11 @@ on_stop(InstId, _State) ->
|
|||
}),
|
||||
case emqx_resource:get_allocated_resources(InstId) of
|
||||
#{pool_name := PoolName, type := cluster} ->
|
||||
eredis_cluster:stop_pool(PoolName);
|
||||
case eredis_cluster:stop_pool(PoolName) of
|
||||
{error, not_found} -> ok;
|
||||
ok -> ok;
|
||||
Error -> Error
|
||||
end;
|
||||
#{pool_name := PoolName, type := _} ->
|
||||
emqx_resource_pool:stop(PoolName);
|
||||
_ ->
|
||||
|
@ -244,8 +235,17 @@ is_unrecoverable_error(_) ->
|
|||
on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
|
||||
case eredis_cluster:pool_exists(PoolName) of
|
||||
true ->
|
||||
Health = eredis_cluster:ping_all(PoolName),
|
||||
status_result(Health);
|
||||
%% eredis_cluster has null slot even pool_exists when emqx start before redis cluster.
|
||||
%% we need restart eredis_cluster pool when pool_worker(slot) is empty.
|
||||
%% If the pool is empty, it means that there are no workers attempting to reconnect.
|
||||
%% In this case, we can directly consider it as a disconnect and then proceed to reconnect.
|
||||
case eredis_cluster_monitor:get_all_pools(PoolName) of
|
||||
[] ->
|
||||
disconnected;
|
||||
[_ | _] ->
|
||||
Health = eredis_cluster:ping_all(PoolName),
|
||||
status_result(Health)
|
||||
end;
|
||||
false ->
|
||||
disconnected
|
||||
end;
|
||||
|
@ -289,6 +289,28 @@ wrap_qp_result(Results) when is_list(Results) ->
|
|||
end.
|
||||
|
||||
%% ===================================================================
|
||||
%% parameters for connector
|
||||
config(#{parameters := #{} = Param} = Config) ->
|
||||
maps:merge(maps:remove(parameters, Config), Param);
|
||||
%% is for authn/authz
|
||||
config(Config) ->
|
||||
Config.
|
||||
|
||||
servers(#{server := Server}) ->
|
||||
servers(Server);
|
||||
servers(#{servers := Servers}) ->
|
||||
servers(Servers);
|
||||
servers(Servers) ->
|
||||
lists:map(
|
||||
fun(#{hostname := Host, port := Port}) ->
|
||||
{Host, Port}
|
||||
end,
|
||||
emqx_schema:parse_servers(Servers, ?REDIS_HOST_OPTIONS)
|
||||
).
|
||||
|
||||
database(#{redis_type := cluster}) -> [];
|
||||
database(#{database := Database}) -> [{database, Database}].
|
||||
|
||||
connect(Opts) ->
|
||||
eredis:start_link(Opts).
|
||||
|
||||
|
|
|
@ -223,7 +223,7 @@ redis_config_base(Type, ServerKey) ->
|
|||
"sentinel" ->
|
||||
Host = ?REDIS_SENTINEL_HOST,
|
||||
Port = ?REDIS_SENTINEL_PORT,
|
||||
MaybeSentinel = " sentinel = mymaster\n",
|
||||
MaybeSentinel = " sentinel = mytcpmaster\n",
|
||||
MaybeDatabase = " database = 1\n";
|
||||
"single" ->
|
||||
Host = ?REDIS_SINGLE_HOST,
|
||||
|
|
|
@ -13,7 +13,11 @@
|
|||
uuid,
|
||||
emqx,
|
||||
emqx_utils,
|
||||
emqx_ctl
|
||||
emqx_ctl,
|
||||
%% rule_engine should wait for bridge connector start,
|
||||
%% it's will check action/connector ref's exist.
|
||||
emqx_bridge,
|
||||
emqx_connector
|
||||
]},
|
||||
{mod, {emqx_rule_engine_app, []}},
|
||||
{env, []},
|
||||
|
|
|
@ -32,14 +32,19 @@ desc_type.desc:
|
|||
desc_type.label:
|
||||
"""Bridge Type"""
|
||||
|
||||
local_topic.desc:
|
||||
desc_local_topic.desc:
|
||||
"""The MQTT topic filter to be forwarded to Redis. All MQTT 'PUBLISH' messages with the topic
|
||||
matching the local_topic will be forwarded.<br/>
|
||||
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
|
||||
configured, then both the data got from the rule and the MQTT messages that match local_topic
|
||||
will be forwarded."""
|
||||
|
||||
local_topic.label:
|
||||
desc_local_topic.label:
|
||||
"""Local Topic"""
|
||||
|
||||
desc_action_parameters.desc:
|
||||
"""The parameters of the action."""
|
||||
desc_action_parameters.label:
|
||||
"""Action Parameters"""
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
emqx_bridge_redis_schema {
|
||||
redis_parameters.label:
|
||||
"""Redis Type Specific Parameters"""
|
||||
|
||||
redis_parameters.desc:
|
||||
"""Set of parameters specific for the given type of this Redis connector, `redis_type` can be one of `single`, `cluster` or `sentinel`."""
|
||||
|
||||
producer_action.desc:
|
||||
"""The parameters of the action."""
|
||||
producer_action.label:
|
||||
"""Action Parameters"""
|
||||
|
||||
redis_type.label:
|
||||
"""Redis Type"""
|
||||
redis_type.desc:
|
||||
"""Single mode. Must be set to 'single' when Redis server is running in single mode.
|
||||
Sentinel mode. Must be set to 'sentinel' when Redis server is running in sentinel mode.
|
||||
Cluster mode. Must be set to 'cluster' when Redis server is running in clustered mode."""
|
||||
|
||||
batch_size.label:
|
||||
"""Batch Size"""
|
||||
batch_size.desc:
|
||||
"""This parameter defines the upper limit of the batch count.
|
||||
Setting this value to 1 effectively disables batching, as it indicates that only one item will be processed per batch.
|
||||
Note on Redis Cluster Mode:
|
||||
In the context of Redis Cluster Mode, it is important to note that batching is not supported.
|
||||
Consequently, the batch_size is always set to 1,
|
||||
reflecting the mode inherent limitation in handling batch operations."""
|
||||
|
||||
batch_time.desc:
|
||||
"""Maximum waiting interval when accumulating a batch at a low message rates for more efficient resource usage."""
|
||||
|
||||
batch_time.label:
|
||||
"""Max batch wait time, disable when in Redis Cluster Mode."""
|
||||
|
||||
redis_action.label:
|
||||
"""Redis Action"""
|
||||
redis_action.desc:
|
||||
"""Action to interact with a Redis connector."""
|
||||
|
||||
}
|
|
@ -1,5 +1,11 @@
|
|||
emqx_connector_schema {
|
||||
|
||||
config_enable.desc:
|
||||
"""Enable (true) or disable (false) this connector."""
|
||||
|
||||
config_enable.label:
|
||||
"""Enable or Disable"""
|
||||
|
||||
desc_connectors.desc:
|
||||
"""Connectors that are used to connect to external systems"""
|
||||
desc_connectors.label:
|
||||
|
|
|
@ -47,4 +47,18 @@ single.desc:
|
|||
single.label:
|
||||
"""Single Mode"""
|
||||
|
||||
redis_cluster_connector.label:
|
||||
"""Redis Cluster Connector"""
|
||||
redis_cluster_connector.desc:
|
||||
"""Redis connector in cluster mode"""
|
||||
|
||||
redis_sentinel_connector.label:
|
||||
"""Redis Sentinel Connector"""
|
||||
redis_sentinel_connector.desc:
|
||||
"""Redis connector in sentinel mode"""
|
||||
|
||||
redis_single_connector.label:
|
||||
"""Redis Single Connector"""
|
||||
redis_single_connector.desc:
|
||||
"""Redis connector in sentinel mode"""
|
||||
}
|
||||
|
|
|
@ -27,11 +27,21 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
|
|||
|
||||
I18N_REPO_BRANCH="v$(./pkg-vsn.sh "${PROFILE_STR}" | tr -d '.' | cut -c 1-2)"
|
||||
|
||||
DOWNLOAD_I18N_TRANSLATIONS=${DOWNLOAD_I18N_TRANSLATIONS:-true}
|
||||
# download desc (i18n) translations
|
||||
curl -L --silent --show-error \
|
||||
--output "apps/emqx_dashboard/priv/desc.zh.hocon" \
|
||||
"https://raw.githubusercontent.com/emqx/emqx-i18n/${I18N_REPO_BRANCH}/desc.zh.hocon"
|
||||
if [ "$DOWNLOAD_I18N_TRANSLATIONS" = "true" ]; then
|
||||
echo "downloading i18n translation from emqx/emqx-i18n"
|
||||
start=$(date +%s)
|
||||
curl -L --silent --show-error \
|
||||
--output "apps/emqx_dashboard/priv/desc.zh.hocon" \
|
||||
"https://raw.githubusercontent.com/emqx/emqx-i18n/${I18N_REPO_BRANCH}/desc.zh.hocon"
|
||||
end=$(date +%s)
|
||||
duration=$(echo "$end $start" | awk '{print $1 - $2}')
|
||||
echo "downloaded i18n translation in $duration seconds, set DOWNLOAD_I18N_TRANSLATIONS=false to skip"
|
||||
else
|
||||
echo "skipping to download i18n translation from emqx/emqx-i18n, set DOWNLOAD_I18N_TRANSLATIONS=true to update"
|
||||
fi
|
||||
|
||||
# TODO
|
||||
# make sbom a build artifcat
|
||||
# make sbom a build artifact
|
||||
# ./scripts/update-bom.sh "$PROFILE_STR" ./rel
|
||||
|
|
Loading…
Reference in New Issue