feat: redis bridge v2

This commit is contained in:
zhongwencool 2023-11-23 11:00:07 +08:00
parent 8ec3857f4d
commit 0aec2f7605
24 changed files with 764 additions and 226 deletions

View File

@ -1,7 +1,7 @@
sentinel resolve-hostnames yes sentinel resolve-hostnames yes
bind :: 0.0.0.0 bind :: 0.0.0.0
sentinel monitor mymaster redis-sentinel-master 6379 1 sentinel monitor mytcpmaster redis-sentinel-master 6379 1
sentinel auth-pass mymaster public sentinel auth-pass mytcpmaster public
sentinel down-after-milliseconds mymaster 10000 sentinel down-after-milliseconds mytcpmaster 10000
sentinel failover-timeout mymaster 20000 sentinel failover-timeout mytcpmaster 20000

View File

@ -8,7 +8,7 @@ tls-key-file /etc/certs/key.pem
tls-ca-cert-file /etc/certs/cacert.pem tls-ca-cert-file /etc/certs/cacert.pem
tls-auth-clients no tls-auth-clients no
sentinel monitor mymaster redis-sentinel-tls-master 6389 1 sentinel monitor mytlsmaster redis-sentinel-tls-master 6389 1
sentinel auth-pass mymaster public sentinel auth-pass mytlsmaster public
sentinel down-after-milliseconds mymaster 10000 sentinel down-after-milliseconds mytlsmaster 10000
sentinel failover-timeout mymaster 20000 sentinel failover-timeout mytlsmaster 20000

View File

@ -64,12 +64,8 @@ refs(_) ->
expected => "single | cluster | sentinel" expected => "single | cluster | sentinel"
}). }).
fields(redis_single) -> fields(Type) ->
common_fields() ++ emqx_redis:fields(single); common_fields() ++ emqx_redis:fields(Type).
fields(redis_cluster) ->
common_fields() ++ emqx_redis:fields(cluster);
fields(redis_sentinel) ->
common_fields() ++ emqx_redis:fields(sentinel).
desc(redis_single) -> desc(redis_single) ->
?DESC(single); ?DESC(single);

View File

@ -34,17 +34,9 @@ namespace() -> "authz".
type() -> ?AUTHZ_TYPE. type() -> ?AUTHZ_TYPE.
fields(redis_single) -> fields(Type) ->
emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++ emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
emqx_redis:fields(single) ++ emqx_redis:fields(Type) ++
[{cmd, cmd()}];
fields(redis_sentinel) ->
emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
emqx_redis:fields(sentinel) ++
[{cmd, cmd()}];
fields(redis_cluster) ->
emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
emqx_redis:fields(cluster) ++
[{cmd, cmd()}]. [{cmd, cmd()}].
desc(redis_single) -> desc(redis_single) ->

View File

@ -81,7 +81,8 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_mongodb_action_info, emqx_bridge_mongodb_action_info,
emqx_bridge_pgsql_action_info, emqx_bridge_pgsql_action_info,
emqx_bridge_syskeeper_action_info, emqx_bridge_syskeeper_action_info,
emqx_bridge_timescale_action_info emqx_bridge_timescale_action_info,
emqx_bridge_redis_action_info
]. ].
-else. -else.
hard_coded_action_info_modules_ee() -> hard_coded_action_info_modules_ee() ->

View File

@ -242,17 +242,17 @@ schema_homogeneous_test() ->
is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
Fields = Module:fields(TypeName), Fields = Module:fields(TypeName),
ExpectedFieldNames = lists:map(fun binary_to_atom/1, top_level_common_action_keys()), ExpectedFieldNames = lists:map(fun binary_to_atom/1, top_level_common_action_keys()),
MissingFileds = lists:filter( MissingFields = lists:filter(
fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames
), ),
case MissingFileds of case MissingFields of
[] -> [] ->
false; false;
_ -> _ ->
{true, #{ {true, #{
schema_module => Module, schema_module => Module,
type_name => TypeName, type_name => TypeName,
missing_fields => MissingFileds missing_fields => MissingFields
}} }}
end. end.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_redis, [ {application, emqx_bridge_redis, [
{description, "EMQX Enterprise Redis Bridge"}, {description, "EMQX Enterprise Redis Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,
@ -9,7 +9,9 @@
emqx_resource, emqx_resource,
emqx_redis emqx_redis
]}, ]},
{env, []}, {env, [
{emqx_action_info_modules, [emqx_bridge_redis_action_info]}
]},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -8,9 +8,9 @@
-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
-export([ -export([conn_bridge_examples/1]).
conn_bridge_examples/1
]). -export([type_name_fields/1, connector_fields/1]).
-export([ -export([
namespace/0, namespace/0,
@ -100,6 +100,8 @@ namespace() -> "bridge_redis".
roots() -> []. roots() -> [].
fields(action_parameters) ->
[{command_template, fun command_template/1}];
fields("post_single") -> fields("post_single") ->
method_fields(post, redis_single); method_fields(post, redis_single);
fields("post_sentinel") -> fields("post_sentinel") ->
@ -142,21 +144,13 @@ method_fields(put, ConnectorType) ->
redis_bridge_common_fields(Type) -> redis_bridge_common_fields(Type) ->
emqx_bridge_schema:common_bridge_fields() ++ emqx_bridge_schema:common_bridge_fields() ++
[ [
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, {local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})}
{command_template, fun command_template/1} | fields(action_parameters)
] ++ ] ++
resource_fields(Type). resource_fields(Type).
connector_fields(Type) -> connector_fields(Type) ->
RedisType = bridge_type_to_redis_conn_type(Type), emqx_redis:fields(Type).
emqx_redis:fields(RedisType).
bridge_type_to_redis_conn_type(redis_single) ->
single;
bridge_type_to_redis_conn_type(redis_sentinel) ->
sentinel;
bridge_type_to_redis_conn_type(redis_cluster) ->
cluster.
type_name_fields(Type) -> type_name_fields(Type) ->
[ [
@ -168,7 +162,7 @@ resource_fields(Type) ->
[ [
{resource_opts, {resource_opts,
mk( mk(
ref("creation_opts_" ++ atom_to_list(Type)), ?R_REF("creation_opts_" ++ atom_to_list(Type)),
#{ #{
required => false, required => false,
default => #{}, default => #{},
@ -185,6 +179,8 @@ resource_creation_fields("redis_cluster") ->
resource_creation_fields(_) -> resource_creation_fields(_) ->
emqx_resource_schema:fields("creation_opts"). emqx_resource_schema:fields("creation_opts").
desc(action_parameters) ->
?DESC("desc_action_parameters");
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->

View File

@ -0,0 +1,98 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_redis_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0,
bridge_v1_config_to_action_config/2,
connector_action_config_to_bridge_v1_config/2,
bridge_v1_config_to_connector_config/1,
bridge_v1_type_name_fun/1
]).
-import(emqx_utils_conv, [bin/1]).
-define(SCHEMA_MODULE, emqx_bridge_redis_schema).
-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
action_type_name() -> redis.
connector_type_name() -> redis.
schema_module() -> ?SCHEMA_MODULE.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
fix_v1_type(
maps:merge(
maps:without(
[<<"connector">>],
map_unindent(<<"parameters">>, ActionConfig)
),
map_unindent(<<"parameters">>, ConnectorConfig)
)
).
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),
ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
ActionConfig#{<<"connector">> => ConnectorName}.
bridge_v1_config_to_connector_config(BridgeV1Config) ->
ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),
ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ConnectorTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields("config_connector")),
%% Need put redis_type into parameter.
%% cluster need type to filter resource_opts
ConnectorKeys =
(maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys)) ++
[<<"redis_type">>],
ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys,
make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config).
%%------------------------------------------------------------------------------------------
%% Internal helper fns
%%------------------------------------------------------------------------------------------
bridge_v1_type_name() ->
{fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}.
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"redis_type">> := Type}}, _}) ->
v1_type(Type).
fix_v1_type(#{<<"redis_type">> := RedisType} = Conf) ->
Conf#{<<"type">> => v1_type(RedisType)}.
v1_type(<<"single">>) -> redis_single;
v1_type(<<"sentinel">>) -> redis_sentinel;
v1_type(<<"cluster">>) -> redis_cluster.
bridge_v1_type_names() -> [redis_single, redis_sentinel, redis_cluster].
map_unindent(Key, Map) ->
maps:merge(
maps:get(Key, Map),
maps:remove(Key, Map)
).
map_indent(IndentKey, PickKeys, Map) ->
maps:put(
IndentKey,
maps:with(PickKeys, Map),
maps:without(PickKeys, Map)
).
schema_keys(Schema) ->
[bin(Key) || {Key, _} <- Schema].
make_config_map(PickKeys, IndentKeys, Config) ->
Conf0 = maps:with(PickKeys, Config),
map_indent(<<"parameters">>, IndentKeys, Conf0).

View File

@ -4,6 +4,7 @@
-module(emqx_bridge_redis_connector). -module(emqx_bridge_redis_connector).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(emqx_resource). -behaviour(emqx_resource).
@ -11,11 +12,15 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
callback_mode/0, callback_mode/0,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
on_get_status/2 on_get_status/2,
on_get_channel_status/3
]). ]).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
@ -24,7 +29,34 @@
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start(InstId, #{command_template := CommandTemplate} = Config) -> on_add_channel(
_InstanceId,
State = #{channels := Channels},
ChannelId,
#{
parameters := #{
command_template := Template
}
}
) ->
Channels2 = Channels#{
ChannelId => #{template => preproc_command_template(Template)}
},
{ok, State#{channels => Channels2}}.
on_remove_channel(_InstanceId, State = #{channels := Channels}, ChannelId) ->
{ok, State#{channels => maps:remove(ChannelId, Channels)}}.
on_get_channels(InstanceId) ->
emqx_bridge_v2:get_channels_for_connector(InstanceId).
on_get_channel_status(_ConnectorResId, ChannelId, #{channels := Channels}) ->
case maps:is_key(ChannelId, Channels) of
true -> ?status_connected;
false -> ?status_disconnected
end.
on_start(InstId, Config) ->
case emqx_redis:on_start(InstId, Config) of case emqx_redis:on_start(InstId, Config) of
{ok, RedisConnSt} -> {ok, RedisConnSt} ->
?tp( ?tp(
@ -33,7 +65,7 @@ on_start(InstId, #{command_template := CommandTemplate} = Config) ->
), ),
{ok, #{ {ok, #{
conn_st => RedisConnSt, conn_st => RedisConnSt,
command_template => preproc_command_template(CommandTemplate) channels => #{}
}}; }};
{error, {start_pool_failed, _, #{type := authentication_error, reason := Reason}}} = Error -> {error, {start_pool_failed, _, #{type := authentication_error, reason := Reason}}} = Error ->
?tp( ?tp(
@ -57,14 +89,8 @@ on_stop(InstId, undefined = _State) ->
on_get_status(InstId, #{conn_st := RedisConnSt}) -> on_get_status(InstId, #{conn_st := RedisConnSt}) ->
emqx_redis:on_get_status(InstId, RedisConnSt). emqx_redis:on_get_status(InstId, RedisConnSt).
on_query( %% raw cmd without template, for CI test
InstId, on_query(InstId, {cmd, Cmd}, #{conn_st := RedisConnSt}) ->
{send_message, Data},
_State = #{
command_template := CommandTemplate, conn_st := RedisConnSt
}
) ->
Cmd = proc_command_template(CommandTemplate, Data),
?tp( ?tp(
redis_bridge_connector_cmd, redis_bridge_connector_cmd,
#{cmd => Cmd, batch => false, mode => sync} #{cmd => Cmd, batch => false, mode => sync}
@ -77,24 +103,30 @@ on_query(
Result; Result;
on_query( on_query(
InstId, InstId,
Query, {_MessageTag, _Data} = Msg,
_State = #{conn_st := RedisConnSt} #{channels := Channels, conn_st := RedisConnSt}
) -> ) ->
case try_render_message([Msg], Channels) of
{ok, [Cmd]} ->
?tp( ?tp(
redis_bridge_connector_query, redis_bridge_connector_cmd,
#{query => Query, batch => false, mode => sync} #{cmd => Cmd, batch => false, mode => sync}
), ),
Result = query(InstId, Query, RedisConnSt), Result = query(InstId, {cmd, Cmd}, RedisConnSt),
?tp( ?tp(
redis_bridge_connector_send_done, redis_bridge_connector_send_done,
#{query => Query, batch => false, mode => sync, result => Result} #{cmd => Cmd, batch => false, mode => sync, result => Result}
), ),
Result. Result;
Error ->
Error
end.
on_batch_query( on_batch_query(
InstId, BatchData, _State = #{command_template := CommandTemplate, conn_st := RedisConnSt} InstId, BatchData, _State = #{channels := Channels, conn_st := RedisConnSt}
) -> ) ->
Cmds = process_batch_data(BatchData, CommandTemplate), case try_render_message(BatchData, Channels) of
{ok, Cmds} ->
?tp( ?tp(
redis_bridge_connector_send, redis_bridge_connector_send,
#{batch_data => BatchData, batch => true, mode => sync} #{batch_data => BatchData, batch => true, mode => sync}
@ -110,12 +142,29 @@ on_batch_query(
result => Result result => Result
} }
), ),
Result. Result;
Error ->
Error
end.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% private helpers %% private helpers
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
try_render_message(Datas, Channels) ->
try_render_message(Datas, Channels, []).
try_render_message([{MessageTag, Data} | T], Channels, Acc) ->
case maps:find(MessageTag, Channels) of
{ok, #{template := Template}} ->
Msg = proc_command_template(Template, Data),
try_render_message(T, Channels, [Msg | Acc]);
_ ->
{error, {unrecoverable_error, {invalid_message_tag, MessageTag}}}
end;
try_render_message([], _Channels, Acc) ->
{ok, lists:reverse(Acc)}.
query(InstId, Query, RedisConnSt) -> query(InstId, Query, RedisConnSt) ->
case emqx_redis:on_query(InstId, Query, RedisConnSt) of case emqx_redis:on_query(InstId, Query, RedisConnSt) of
{ok, _} = Ok -> Ok; {ok, _} = Ok -> Ok;
@ -123,14 +172,6 @@ query(InstId, Query, RedisConnSt) ->
{error, _} = Error -> Error {error, _} = Error -> Error
end. end.
process_batch_data(BatchData, CommandTemplate) ->
lists:map(
fun({send_message, Data}) ->
proc_command_template(CommandTemplate, Data)
end,
BatchData
).
proc_command_template(CommandTemplate, Msg) -> proc_command_template(CommandTemplate, Msg) ->
lists:map( lists:map(
fun(ArgTks) -> fun(ArgTks) ->

View File

@ -0,0 +1,276 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_redis_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-define(TYPE, redis).
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1,
desc/1,
resource_opts_converter/2
]).
%% `emqx_bridge_v2_schema' "unofficial" API
-export([
bridge_v2_examples/1,
conn_bridge_examples/1,
connector_examples/1
]).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
?TYPE.
roots() ->
[].
%%=========================================
%% Action fields
%%=========================================
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
[
{parameters,
?HOCON(
hoconsc:union([
?R_REF(emqx_redis, redis_single_connector),
?R_REF(emqx_redis, redis_sentinel_connector),
?R_REF(emqx_redis, redis_cluster_connector)
]),
#{required => true, desc => ?DESC(redis_parameters)}
)}
] ++
emqx_redis:redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(action) ->
{?TYPE,
?HOCON(
?MAP(name, ?R_REF(redis_action)),
#{
desc => <<"Redis Action Config">>,
converter => fun ?MODULE:resource_opts_converter/2,
required => false
}
)};
fields(redis_action) ->
Schema =
emqx_bridge_v2_schema:make_producer_action_schema(
?HOCON(
?R_REF(emqx_bridge_redis, action_parameters),
#{
required => true,
desc => ?DESC(producer_action)
}
)
),
ResOpts =
{resource_opts,
?HOCON(
?R_REF(resource_opts),
#{
required => true,
desc => ?DESC(emqx_resource_schema, resource_opts)
}
)},
RedisType =
{redis_type,
?HOCON(
?ENUM([single, sentinel, cluster]),
#{required => true, desc => ?DESC(redis_type)}
)},
[RedisType | lists:keyreplace(resource_opts, 1, Schema, ResOpts)];
fields(resource_opts) ->
emqx_resource_schema:create_opts([
{batch_size, #{desc => ?DESC(batch_size)}},
{batch_time, #{desc => ?DESC(batch_time)}}
]);
%%=========================================
%% HTTP API fields
%%=========================================
fields("post_connector") ->
emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("config_connector");
fields("put_connector") ->
fields("config_connector");
fields("get_connector") ->
emqx_bridge_schema:status_fields() ++
fields("post_connector");
fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2");
fields("post_bridge_v2") ->
emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("put_bridge_v2");
fields("put_bridge_v2") ->
fields(redis_action);
fields("get_single") ->
emqx_bridge_schema:status_fields() ++ fields("put_single");
fields("put_single") ->
fields("config_connector");
fields("post_single") ->
emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("put_single").
desc("config_connector") ->
?DESC(emqx_bridge_redis, "desc_config");
desc(redis_action) ->
?DESC(redis_action);
desc(resource_opts) ->
?DESC(emqx_resource_schema, resource_opts);
desc(_Name) ->
undefined.
resource_opts_converter(undefined, _Opts) ->
undefined;
resource_opts_converter(Conf, _Opts) ->
maps:map(
fun(_Name, SubConf) ->
case SubConf of
#{<<"redis_type">> := <<"cluster">>} ->
ResOpts = maps:get(<<"resource_opts">>, SubConf, #{}),
%% cluster don't support batch
SubConf#{
<<"resource_opts">> =>
ResOpts#{<<"batch_size">> => 1, <<"batch_time">> => <<"0ms">>}
};
_ ->
SubConf
end
end,
Conf
).
%%-------------------------------------------------------------------------------------------------
%% `emqx_bridge_v2_schema' "unofficial" API
%%-------------------------------------------------------------------------------------------------
bridge_v2_examples(Method) ->
[
#{
<<"redis_single_producer">> => #{
summary => <<"Redis Single Producer Action">>,
value => action_example(single, Method)
}
},
#{
<<"redis_sentinel_producer">> => #{
summary => <<"Redis Sentinel Producer Action">>,
value => action_example(sentinel, Method)
}
},
#{
<<"redis_cluster_producer">> => #{
summary => <<"Redis Cluster Producer Action">>,
value => action_example(cluster, Method)
}
}
].
connector_examples(Method) ->
[
#{
<<"redis_single_producer">> => #{
summary => <<"Redis Single Producer Connector">>,
value => connector_example(single, Method)
}
},
#{
<<"redis_cluster_producer">> => #{
summary => <<"Redis Cluster Producer Connector">>,
value => connector_example(cluster, Method)
}
},
#{
<<"redis_sentinel_producer">> => #{
summary => <<"Redis Sentinel Producer Connector">>,
value => connector_example(sentinel, Method)
}
}
].
conn_bridge_examples(Method) ->
emqx_bridge_redis:conn_bridge_examples(Method).
action_example(RedisType, post) ->
maps:merge(
action_example(RedisType, put),
#{
type => <<"redis">>,
name => <<"my_action">>
}
);
action_example(RedisType, get) ->
maps:merge(
action_example(RedisType, put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
action_example(RedisType, put) ->
#{
redis_type => RedisType,
enable => true,
connector => <<"my_connector_name">>,
description => <<"My action">>,
parameters => #{
command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>]
},
resource_opts => #{batch_size => 1}
}.
connector_example(RedisType, get) ->
maps:merge(
connector_example(RedisType, put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
connector_example(RedisType, post) ->
maps:merge(
connector_example(RedisType, put),
#{
type => <<"redis_single_producer">>,
name => <<"my_connector">>
}
);
connector_example(RedisType, put) ->
#{
enable => true,
desc => <<"My redis ", (atom_to_binary(RedisType))/binary, " connector">>,
parameters => connector_parameter(RedisType),
pool_size => 8,
database => 1,
username => <<"test">>,
password => <<"******">>,
auto_reconnect => true,
ssl => #{enable => false}
}.
connector_parameter(single) ->
#{redis_type => single, server => <<"127.0.0.1:6379">>};
connector_parameter(cluster) ->
#{redis_type => cluster, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>};
connector_parameter(sentinel) ->
#{
redis_type => sentinel,
servers => <<"127.0.0.1:6379,127.0.0.2:6379">>,
sentinel => <<"myredismaster">>
}.

View File

@ -56,6 +56,7 @@
). ).
all() -> [{group, transports}, {group, rest}]. all() -> [{group, transports}, {group, rest}].
suite() -> [{timetrap, {minutes, 20}}].
groups() -> groups() ->
ResourceSpecificTCs = [t_create_delete_bridge], ResourceSpecificTCs = [t_create_delete_bridge],
@ -143,15 +144,19 @@ redis_checks() ->
end. end.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok = delete_all_bridges(), ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_resource]), ok = emqx_connector_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_resource]),
_ = application:stop(emqx_connector), _ = application:stop(emqx_connector),
ok. ok.
init_per_testcase(_Testcase, Config) -> init_per_testcase(Testcase, Config0) ->
emqx_logger:set_log_level(debug),
ok = delete_all_rules(), ok = delete_all_rules(),
ok = delete_all_bridges(), ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = <<(atom_to_binary(Testcase))/binary, UniqueNum/binary>>,
Config = [{bridge_name, Name} | Config0],
case {?config(connector_type, Config), ?config(batch_mode, Config)} of case {?config(connector_type, Config), ?config(batch_mode, Config)} of
{undefined, _} -> {undefined, _} ->
Config; Config;
@ -165,7 +170,13 @@ init_per_testcase(_Testcase, Config) ->
IsBatch = (BatchMode =:= batch_on), IsBatch = (BatchMode =:= batch_on),
BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS), BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS),
BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig}, BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig},
[{bridge_config, BridgeConfig1}, {is_batch, IsBatch} | Config] BridgeType = list_to_atom(atom_to_list(RedisType) ++ "_producer"),
[
{bridge_type, BridgeType},
{bridge_config, BridgeConfig1},
{is_batch, IsBatch}
| Config
]
end. end.
end_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) ->
@ -173,10 +184,18 @@ end_per_testcase(_Testcase, Config) ->
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ok = delete_all_bridges(). ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors().
t_create_delete_bridge(Config) -> t_create_delete_bridge(Config) ->
Name = <<"mybridge">>, Pid = erlang:whereis(eredis_sentinel),
ct:pal("t_create_detele_bridge:~p~n", [
#{
config => Config,
sentinel => Pid,
eredis_sentinel => Pid =/= undefined andalso erlang:process_info(Pid)
}
]),
Name = ?config(bridge_name, Config),
Type = ?config(connector_type, Config), Type = ?config(connector_type, Config),
BridgeConfig = ?config(bridge_config, Config), BridgeConfig = ?config(bridge_config, Config),
IsBatch = ?config(is_batch, Config), IsBatch = ?config(is_batch, Config),
@ -184,13 +203,11 @@ t_create_delete_bridge(Config) ->
{ok, _}, {ok, _},
emqx_bridge:create(Type, Name, BridgeConfig) emqx_bridge:create(Type, Name, BridgeConfig)
), ),
ResourceId = emqx_bridge_resource:resource_id(Type, Name), ResourceId = emqx_bridge_resource:resource_id(Type, Name),
?WAIT( ?WAIT(
{ok, connected}, {ok, connected},
emqx_resource:health_check(ResourceId), emqx_resource:health_check(ResourceId),
5 10
), ),
RedisType = atom_to_binary(Type), RedisType = atom_to_binary(Type),
@ -244,7 +261,7 @@ t_check_values(_Config) ->
). ).
t_check_replay(Config) -> t_check_replay(Config) ->
Name = <<"toxic_bridge">>, Name = ?config(bridge_name, Config),
Type = <<"redis_single">>, Type = <<"redis_single">>,
Topic = <<"local_topic/test">>, Topic = <<"local_topic/test">>,
ProxyName = "redis_single_tcp", ProxyName = "redis_single_tcp",
@ -324,15 +341,15 @@ t_permanent_error(_Config) ->
), ),
ok = emqx_bridge:remove(Type, Name). ok = emqx_bridge:remove(Type, Name).
t_auth_username_password(_Config) -> t_auth_username_password(Config) ->
Name = <<"mybridge">>, Name = ?config(bridge_name, Config),
Type = <<"redis_single">>, Type = <<"redis_single">>,
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
BridgeConfig = username_password_redis_bridge_config(), BridgeConfig = username_password_redis_bridge_config(),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_bridge:create(Type, Name, BridgeConfig) emqx_bridge:create(Type, Name, BridgeConfig)
), ),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
?WAIT( ?WAIT(
{ok, connected}, {ok, connected},
emqx_resource:health_check(ResourceId), emqx_resource:health_check(ResourceId),
@ -340,16 +357,16 @@ t_auth_username_password(_Config) ->
), ),
ok = emqx_bridge:remove(Type, Name). ok = emqx_bridge:remove(Type, Name).
t_auth_error_username_password(_Config) -> t_auth_error_username_password(Config) ->
Name = <<"mybridge">>, Name = ?config(bridge_name, Config),
Type = <<"redis_single">>, Type = <<"redis_single">>,
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
BridgeConfig0 = username_password_redis_bridge_config(), BridgeConfig0 = username_password_redis_bridge_config(),
BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}), BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_bridge:create(Type, Name, BridgeConfig) emqx_bridge:create(Type, Name, BridgeConfig)
), ),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
?WAIT( ?WAIT(
{ok, disconnected}, {ok, disconnected},
emqx_resource:health_check(ResourceId), emqx_resource:health_check(ResourceId),
@ -361,16 +378,16 @@ t_auth_error_username_password(_Config) ->
), ),
ok = emqx_bridge:remove(Type, Name). ok = emqx_bridge:remove(Type, Name).
t_auth_error_password_only(_Config) -> t_auth_error_password_only(Config) ->
Name = <<"mybridge">>, Name = ?config(bridge_name, Config),
Type = <<"redis_single">>, Type = <<"redis_single">>,
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
BridgeConfig0 = toxiproxy_redis_bridge_config(), BridgeConfig0 = toxiproxy_redis_bridge_config(),
BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}), BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_bridge:create(Type, Name, BridgeConfig) emqx_bridge:create(Type, Name, BridgeConfig)
), ),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
?assertEqual( ?assertEqual(
{ok, disconnected}, {ok, disconnected},
emqx_resource:health_check(ResourceId) emqx_resource:health_check(ResourceId)
@ -382,7 +399,7 @@ t_auth_error_password_only(_Config) ->
ok = emqx_bridge:remove(Type, Name). ok = emqx_bridge:remove(Type, Name).
t_create_disconnected(Config) -> t_create_disconnected(Config) ->
Name = <<"toxic_bridge">>, Name = ?config(bridge_name, Config),
Type = <<"redis_single">>, Type = <<"redis_single">>,
?check_trace( ?check_trace(
@ -450,10 +467,8 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
added_msgs(ResourceId, BaseTopic, Payload) -> added_msgs(ResourceId, BaseTopic, Payload) ->
lists:flatmap( lists:flatmap(
fun(K) -> fun(K) ->
{ok, Results} = emqx_resource:simple_sync_query( Message = {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]},
ResourceId, {ok, Results} = emqx_resource:simple_sync_query(ResourceId, Message),
{cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]}
),
[El || El <- Results, El =:= Payload] [El || El <- Results, El =:= Payload]
end, end,
[format_redis_key(BaseTopic, S) || S <- lists:seq(0, ?KEYSHARDS - 1)] [format_redis_key(BaseTopic, S) || S <- lists:seq(0, ?KEYSHARDS - 1)]
@ -482,14 +497,6 @@ delete_all_rules() ->
emqx_rule_engine:get_rules() emqx_rule_engine:get_rules()
). ).
delete_all_bridges() ->
lists:foreach(
fun(#{name := Name, type := Type}) ->
emqx_bridge:remove(Type, Name)
end,
emqx_bridge:list()
).
all_test_hosts() -> all_test_hosts() ->
Confs = [ Confs = [
?REDIS_TOXYPROXY_CONNECT_CONFIG ?REDIS_TOXYPROXY_CONNECT_CONFIG
@ -554,12 +561,12 @@ redis_connect_configs() ->
tcp => #{ tcp => #{
<<"servers">> => <<"redis-sentinel:26379">>, <<"servers">> => <<"redis-sentinel:26379">>,
<<"redis_type">> => <<"sentinel">>, <<"redis_type">> => <<"sentinel">>,
<<"sentinel">> => <<"mymaster">> <<"sentinel">> => <<"mytcpmaster">>
}, },
tls => #{ tls => #{
<<"servers">> => <<"redis-sentinel-tls:26380">>, <<"servers">> => <<"redis-sentinel-tls:26380">>,
<<"redis_type">> => <<"sentinel">>, <<"redis_type">> => <<"sentinel">>,
<<"sentinel">> => <<"mymaster">>, <<"sentinel">> => <<"mytlsmaster">>,
<<"ssl">> => redis_connect_ssl_opts(redis_sentinel) <<"ssl">> => redis_connect_ssl_opts(redis_sentinel)
} }
}, },

View File

@ -9,6 +9,9 @@
stdlib, stdlib,
ecpool, ecpool,
emqx_resource, emqx_resource,
eredis,
%% eredis_cluster has supervisor should be started before emqx_connector
%% otherwise the first start redis_cluster will fail.
eredis_cluster, eredis_cluster,
ehttpc, ehttpc,
jose, jose,

View File

@ -41,6 +41,8 @@ resource_type(syskeeper_proxy) ->
emqx_bridge_syskeeper_proxy_server; emqx_bridge_syskeeper_proxy_server;
resource_type(timescale) -> resource_type(timescale) ->
emqx_postgresql; emqx_postgresql;
resource_type(redis) ->
emqx_bridge_redis_connector;
resource_type(Type) -> resource_type(Type) ->
error({unknown_connector_type, Type}). error({unknown_connector_type, Type}).
@ -138,6 +140,14 @@ connector_structs() ->
desc => <<"Matrix Connector Config">>, desc => <<"Matrix Connector Config">>,
required => false required => false
} }
)},
{redis,
mk(
hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")),
#{
desc => <<"Redis Connector Config">>,
required => false
}
)} )}
]. ].
@ -164,7 +174,8 @@ schema_modules() ->
emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_connector,
emqx_bridge_syskeeper_proxy, emqx_bridge_syskeeper_proxy,
emqx_bridge_timescale, emqx_bridge_timescale,
emqx_postgresql_connector_schema emqx_postgresql_connector_schema,
emqx_bridge_redis_schema
]. ].
api_schemas(Method) -> api_schemas(Method) ->
@ -188,7 +199,8 @@ api_schemas(Method) ->
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector") api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector")
]. ].
api_ref(Module, Type, Method) -> api_ref(Module, Type, Method) ->

View File

@ -71,16 +71,28 @@ enterprise_fields_connectors() -> [].
-endif. -endif.
connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]; connector_type_to_bridge_types(azure_event_hub_producer) ->
connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; [azure_event_hub_producer];
connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer]; connector_type_to_bridge_types(confluent_producer) ->
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; [confluent_producer];
connector_type_to_bridge_types(matrix) -> [matrix]; connector_type_to_bridge_types(gcp_pubsub_producer) ->
connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; [gcp_pubsub, gcp_pubsub_producer];
connector_type_to_bridge_types(pgsql) -> [pgsql]; connector_type_to_bridge_types(kafka_producer) ->
connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; [kafka, kafka_producer];
connector_type_to_bridge_types(syskeeper_proxy) -> []; connector_type_to_bridge_types(matrix) ->
connector_type_to_bridge_types(timescale) -> [timescale]. [matrix];
connector_type_to_bridge_types(mongodb) ->
[mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
connector_type_to_bridge_types(pgsql) ->
[pgsql];
connector_type_to_bridge_types(syskeeper_forwarder) ->
[syskeeper_forwarder];
connector_type_to_bridge_types(syskeeper_proxy) ->
[];
connector_type_to_bridge_types(timescale) ->
[timescale];
connector_type_to_bridge_types(redis) ->
[redis, redis_single, redis_sentinel, redis_cluster].
actions_config_name() -> <<"actions">>. actions_config_name() -> <<"actions">>.
@ -125,7 +137,7 @@ split_bridge_to_connector_and_action(
BridgeType, BridgeV1Conf BridgeType, BridgeV1Conf
); );
false -> false ->
%% We do an automatic transfomation to get the connector config %% We do an automatic transformation to get the connector config
%% if the callback is not defined. %% if the callback is not defined.
%% Get connector fields from bridge config %% Get connector fields from bridge config
lists:foldl( lists:foldl(

View File

@ -844,7 +844,7 @@ parse_object_loop(PropList0, Module, Options) ->
), ),
parse_object_loop(PropList, Module, Options, _Props = [], _Required = [], _Refs = []). parse_object_loop(PropList, Module, Options, _Props = [], _Required = [], _Refs = []).
parse_object_loop([], _Modlue, _Options, Props, Required, Refs) -> parse_object_loop([], _Module, _Options, Props, Required, Refs) ->
{lists:reverse(Props), lists:usort(Required), Refs}; {lists:reverse(Props), lists:usort(Required), Refs};
parse_object_loop([{Name, Hocon} | Rest], Module, Options, Props, Required, Refs) -> parse_object_loop([{Name, Hocon} | Rest], Module, Options, Props, Required, Refs) ->
NameBin = to_bin(Name), NameBin = to_bin(Name),

View File

@ -20,7 +20,7 @@
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-export([roots/0, fields/1]). -export([roots/0, fields/1, redis_fields/0, desc/1]).
-behaviour(emqx_resource). -behaviour(emqx_resource).
@ -50,55 +50,46 @@ roots() ->
{config, #{ {config, #{
type => hoconsc:union( type => hoconsc:union(
[ [
hoconsc:ref(?MODULE, cluster), ?R_REF(redis_cluster),
hoconsc:ref(?MODULE, single), ?R_REF(redis_single),
hoconsc:ref(?MODULE, sentinel) ?R_REF(redis_sentinel)
] ]
) )
}} }}
]. ].
fields(single) -> fields(redis_single) ->
[ fields(redis_single_connector) ++
{server, server()},
{redis_type, #{
type => single,
default => single,
required => false,
desc => ?DESC("single")
}}
] ++
redis_fields() ++ redis_fields() ++
emqx_connector_schema_lib:ssl_fields(); emqx_connector_schema_lib:ssl_fields();
fields(cluster) -> fields(redis_single_connector) ->
[ [
{servers, servers()}, {server, server()},
{redis_type, #{ redis_type(single)
type => cluster, ];
default => cluster, fields(redis_cluster) ->
required => false, fields(redis_cluster_connector) ++
desc => ?DESC("cluster")
}}
] ++
lists:keydelete(database, 1, redis_fields()) ++ lists:keydelete(database, 1, redis_fields()) ++
emqx_connector_schema_lib:ssl_fields(); emqx_connector_schema_lib:ssl_fields();
fields(sentinel) -> fields(redis_cluster_connector) ->
[ [
{servers, servers()}, {servers, servers()},
{redis_type, #{ redis_type(cluster)
type => sentinel, ];
default => sentinel, fields(redis_sentinel) ->
required => false, fields(redis_sentinel_connector) ++
desc => ?DESC("sentinel") redis_fields() ++
}}, emqx_connector_schema_lib:ssl_fields();
fields(redis_sentinel_connector) ->
[
{servers, servers()},
redis_type(sentinel),
{sentinel, #{ {sentinel, #{
type => string(), type => string(),
required => true, required => true,
desc => ?DESC("sentinel_desc") desc => ?DESC("sentinel_desc")
}} }}
] ++ ].
redis_fields() ++
emqx_connector_schema_lib:ssl_fields().
server() -> server() ->
Meta = #{desc => ?DESC("server")}, Meta = #{desc => ?DESC("server")},
@ -108,64 +99,52 @@ servers() ->
Meta = #{desc => ?DESC("servers")}, Meta = #{desc => ?DESC("servers")},
emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS). emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS).
desc(redis_cluster_connector) ->
?DESC(redis_cluster_connector);
desc(redis_single_connector) ->
?DESC(redis_single_connector);
desc(redis_sentinel_connector) ->
?DESC(redis_sentinel_connector);
desc(_) ->
undefined.
%% =================================================================== %% ===================================================================
redis_type(Type) ->
{redis_type, #{
type => Type,
default => Type,
required => false,
desc => ?DESC(Type)
}}.
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start( on_start(InstId, Config0) ->
InstId,
#{
redis_type := Type,
pool_size := PoolSize,
ssl := SSL
} = Config
) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_redis_connector", msg => "starting_redis_connector",
connector => InstId, connector => InstId,
config => emqx_utils:redact(Config) config => emqx_utils:redact(Config0)
}), }),
ConfKey = Config = config(Config0),
case Type of #{pool_size := PoolSize, ssl := SSL, redis_type := Type} = Config,
single -> server; Options = ssl_options(SSL) ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
_ -> servers
end,
Servers0 = maps:get(ConfKey, Config),
Servers1 = lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS)
),
Servers = [{servers, Servers1}],
Database =
case Type of
cluster -> [];
_ -> [{database, maps:get(database, Config)}]
end,
Opts = Opts =
[ [
{pool_size, PoolSize},
{username, maps:get(username, Config, undefined)}, {username, maps:get(username, Config, undefined)},
{password, maps:get(password, Config, "")}, {password, maps:get(password, Config, "")},
{servers, servers(Config)},
{options, Options},
{pool_size, PoolSize},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL} {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
] ++ Database ++ Servers, ] ++ database(Config),
Options =
case maps:get(enable, SSL) of
true ->
[
{ssl, true},
{ssl_options, emqx_tls_lib:to_client_opts(SSL)}
];
false ->
[{ssl, false}]
end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
State = #{pool_name => InstId, type => Type}, State = #{pool_name => InstId, type => Type},
ok = emqx_resource:allocate_resource(InstId, type, Type), ok = emqx_resource:allocate_resource(InstId, type, Type),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case Type of case Type of
cluster -> cluster ->
case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of case eredis_cluster:start_pool(InstId, Opts) of
{ok, _} -> {ok, _} ->
{ok, State}; {ok, State};
{ok, _, _} -> {ok, _, _} ->
@ -174,7 +153,7 @@ on_start(
{error, Reason} {error, Reason}
end; end;
_ -> _ ->
case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ [{options, Options}]) of case emqx_resource_pool:start(InstId, ?MODULE, Opts) of
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
@ -182,6 +161,14 @@ on_start(
end end
end. end.
ssl_options(SSL = #{enable := true}) ->
[
{ssl, true},
{ssl_options, emqx_tls_lib:to_client_opts(SSL)}
];
ssl_options(#{enable := false}) ->
[{ssl, false}].
on_stop(InstId, _State) -> on_stop(InstId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_redis_connector", msg => "stopping_redis_connector",
@ -189,7 +176,11 @@ on_stop(InstId, _State) ->
}), }),
case emqx_resource:get_allocated_resources(InstId) of case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName, type := cluster} -> #{pool_name := PoolName, type := cluster} ->
eredis_cluster:stop_pool(PoolName); case eredis_cluster:stop_pool(PoolName) of
{error, not_found} -> ok;
ok -> ok;
Error -> Error
end;
#{pool_name := PoolName, type := _} -> #{pool_name := PoolName, type := _} ->
emqx_resource_pool:stop(PoolName); emqx_resource_pool:stop(PoolName);
_ -> _ ->
@ -244,8 +235,17 @@ is_unrecoverable_error(_) ->
on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) -> on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
case eredis_cluster:pool_exists(PoolName) of case eredis_cluster:pool_exists(PoolName) of
true -> true ->
%% eredis_cluster has null slot even pool_exists when emqx start before redis cluster.
%% we need restart eredis_cluster pool when pool_worker(slot) is empty.
%% If the pool is empty, it means that there are no workers attempting to reconnect.
%% In this case, we can directly consider it as a disconnect and then proceed to reconnect.
case eredis_cluster_monitor:get_all_pools(PoolName) of
[] ->
disconnected;
[_ | _] ->
Health = eredis_cluster:ping_all(PoolName), Health = eredis_cluster:ping_all(PoolName),
status_result(Health); status_result(Health)
end;
false -> false ->
disconnected disconnected
end; end;
@ -289,6 +289,28 @@ wrap_qp_result(Results) when is_list(Results) ->
end. end.
%% =================================================================== %% ===================================================================
%% parameters for connector
config(#{parameters := #{} = Param} = Config) ->
maps:merge(maps:remove(parameters, Config), Param);
%% is for authn/authz
config(Config) ->
Config.
servers(#{server := Server}) ->
servers(Server);
servers(#{servers := Servers}) ->
servers(Servers);
servers(Servers) ->
lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(Servers, ?REDIS_HOST_OPTIONS)
).
database(#{redis_type := cluster}) -> [];
database(#{database := Database}) -> [{database, Database}].
connect(Opts) -> connect(Opts) ->
eredis:start_link(Opts). eredis:start_link(Opts).

View File

@ -223,7 +223,7 @@ redis_config_base(Type, ServerKey) ->
"sentinel" -> "sentinel" ->
Host = ?REDIS_SENTINEL_HOST, Host = ?REDIS_SENTINEL_HOST,
Port = ?REDIS_SENTINEL_PORT, Port = ?REDIS_SENTINEL_PORT,
MaybeSentinel = " sentinel = mymaster\n", MaybeSentinel = " sentinel = mytcpmaster\n",
MaybeDatabase = " database = 1\n"; MaybeDatabase = " database = 1\n";
"single" -> "single" ->
Host = ?REDIS_SINGLE_HOST, Host = ?REDIS_SINGLE_HOST,

View File

@ -13,7 +13,11 @@
uuid, uuid,
emqx, emqx,
emqx_utils, emqx_utils,
emqx_ctl emqx_ctl,
%% rule_engine should wait for bridge connector start,
%% it's will check action/connector ref's exist.
emqx_bridge,
emqx_connector
]}, ]},
{mod, {emqx_rule_engine_app, []}}, {mod, {emqx_rule_engine_app, []}},
{env, []}, {env, []},

View File

@ -32,14 +32,19 @@ desc_type.desc:
desc_type.label: desc_type.label:
"""Bridge Type""" """Bridge Type"""
local_topic.desc: desc_local_topic.desc:
"""The MQTT topic filter to be forwarded to Redis. All MQTT 'PUBLISH' messages with the topic """The MQTT topic filter to be forwarded to Redis. All MQTT 'PUBLISH' messages with the topic
matching the local_topic will be forwarded.<br/> matching the local_topic will be forwarded.<br/>
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
configured, then both the data got from the rule and the MQTT messages that match local_topic configured, then both the data got from the rule and the MQTT messages that match local_topic
will be forwarded.""" will be forwarded."""
local_topic.label: desc_local_topic.label:
"""Local Topic""" """Local Topic"""
desc_action_parameters.desc:
"""The parameters of the action."""
desc_action_parameters.label:
"""Action Parameters"""
} }

View File

@ -0,0 +1,41 @@
emqx_bridge_redis_schema {
redis_parameters.label:
"""Redis Type Specific Parameters"""
redis_parameters.desc:
"""Set of parameters specific for the given type of this Redis connector, `redis_type` can be one of `single`, `cluster` or `sentinel`."""
producer_action.desc:
"""The parameters of the action."""
producer_action.label:
"""Action Parameters"""
redis_type.label:
"""Redis Type"""
redis_type.desc:
"""Single mode. Must be set to 'single' when Redis server is running in single mode.
Sentinel mode. Must be set to 'sentinel' when Redis server is running in sentinel mode.
Cluster mode. Must be set to 'cluster' when Redis server is running in clustered mode."""
batch_size.label:
"""Batch Size"""
batch_size.desc:
"""This parameter defines the upper limit of the batch count.
Setting this value to 1 effectively disables batching, as it indicates that only one item will be processed per batch.
Note on Redis Cluster Mode:
In the context of Redis Cluster Mode, it is important to note that batching is not supported.
Consequently, the batch_size is always set to 1,
reflecting the mode inherent limitation in handling batch operations."""
batch_time.desc:
"""Maximum waiting interval when accumulating a batch at a low message rates for more efficient resource usage."""
batch_time.label:
"""Max batch wait time, disable when in Redis Cluster Mode."""
redis_action.label:
"""Redis Action"""
redis_action.desc:
"""Action to interact with a Redis connector."""
}

View File

@ -1,5 +1,11 @@
emqx_connector_schema { emqx_connector_schema {
config_enable.desc:
"""Enable (true) or disable (false) this connector."""
config_enable.label:
"""Enable or Disable"""
desc_connectors.desc: desc_connectors.desc:
"""Connectors that are used to connect to external systems""" """Connectors that are used to connect to external systems"""
desc_connectors.label: desc_connectors.label:

View File

@ -47,4 +47,18 @@ single.desc:
single.label: single.label:
"""Single Mode""" """Single Mode"""
redis_cluster_connector.label:
"""Redis Cluster Connector"""
redis_cluster_connector.desc:
"""Redis connector in cluster mode"""
redis_sentinel_connector.label:
"""Redis Sentinel Connector"""
redis_sentinel_connector.desc:
"""Redis connector in sentinel mode"""
redis_single_connector.label:
"""Redis Single Connector"""
redis_single_connector.desc:
"""Redis connector in sentinel mode"""
} }

View File

@ -27,10 +27,20 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
I18N_REPO_BRANCH="v$(./pkg-vsn.sh "${PROFILE_STR}" | tr -d '.' | cut -c 1-2)" I18N_REPO_BRANCH="v$(./pkg-vsn.sh "${PROFILE_STR}" | tr -d '.' | cut -c 1-2)"
UPDATE_I18N=${UPDATE_I18N:-true}
# download desc (i18n) translations # download desc (i18n) translations
if [ "$UPDATE_I18N" = "true" ]; then
echo "updating i18n file from emqx-i18n repo"
start=$(date +%s)
curl -L --silent --show-error \ curl -L --silent --show-error \
--output "apps/emqx_dashboard/priv/desc.zh.hocon" \ --output "apps/emqx_dashboard/priv/desc.zh.hocon" \
"https://raw.githubusercontent.com/emqx/emqx-i18n/${I18N_REPO_BRANCH}/desc.zh.hocon" "https://raw.githubusercontent.com/emqx/emqx-i18n/${I18N_REPO_BRANCH}/desc.zh.hocon"
end=$(date +%s)
duration=$(echo "$end $start" | awk '{print $1 - $2}')
echo "updated i18n file using $duration seconds, set UPDATE_I18N=false to skip"
else
echo "skipping update i18n file from emqx-i18n repo, set UPDATE_I18N=true to update"
fi
# TODO # TODO
# make sbom a build artifcat # make sbom a build artifcat