Merge remote-tracking branch 'origin/release-54' into 1201-sync-release-54

This commit is contained in:
Zaiming (Stone) Shi 2023-12-01 08:32:22 +01:00
commit 7f4d91d490
34 changed files with 807 additions and 253 deletions

View File

@ -1,7 +1,7 @@
sentinel resolve-hostnames yes
bind :: 0.0.0.0
sentinel monitor mymaster redis-sentinel-master 6379 1
sentinel auth-pass mymaster public
sentinel down-after-milliseconds mymaster 10000
sentinel failover-timeout mymaster 20000
sentinel monitor mytcpmaster redis-sentinel-master 6379 1
sentinel auth-pass mytcpmaster public
sentinel down-after-milliseconds mytcpmaster 10000
sentinel failover-timeout mytcpmaster 20000

View File

@ -8,7 +8,7 @@ tls-key-file /etc/certs/key.pem
tls-ca-cert-file /etc/certs/cacert.pem
tls-auth-clients no
sentinel monitor mymaster redis-sentinel-tls-master 6389 1
sentinel auth-pass mymaster public
sentinel down-after-milliseconds mymaster 10000
sentinel failover-timeout mymaster 20000
sentinel monitor mytlsmaster redis-sentinel-tls-master 6389 1
sentinel auth-pass mytlsmaster public
sentinel down-after-milliseconds mytlsmaster 10000
sentinel failover-timeout mytlsmaster 20000

View File

@ -45,7 +45,7 @@
{meck, "0.9.2"},
{proper, "1.4.0"},
{bbmustache, "1.10.0"},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.6"}}}
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.7"}}}
]},
{extra_src_dirs, [{"test", [recursive]},
{"integration_test", [recursive]}]}
@ -55,7 +55,7 @@
{meck, "0.9.2"},
{proper, "1.4.0"},
{bbmustache, "1.10.0"},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.6"}}}
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.7"}}}
]},
{extra_src_dirs, [{"test", [recursive]}]}
]}

View File

@ -24,7 +24,7 @@ IsQuicSupp = fun() ->
end,
Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}},
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.202"}}}.
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.303"}}}.
Dialyzer = fun(Config) ->
{dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config),

View File

@ -184,7 +184,7 @@ peer_send_aborted(Stream, ErrorCode, S) ->
-spec peer_send_shutdown(stream_handle(), undefined, cb_data()) -> cb_ret().
peer_send_shutdown(Stream, undefined, S) ->
ok = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0),
_ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0),
{ok, S}.
-spec send_complete(stream_handle(), boolean(), cb_data()) -> cb_ret().

View File

@ -669,22 +669,21 @@ t_multi_streams_packet_malform(Config) ->
case quicer:send(MalformStream, <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>) of
{ok, 10} -> ok;
{error, cancelled} -> ok;
{error, stm_send_error, aborted} -> ok
{error, stm_send_error, aborted} -> ok;
{error, closed} -> ok
end,
?assert(is_list(emqtt:info(C))),
{error, stm_send_error, _} =
{error, closed} =
snabbkaffe:retry(
10000,
10,
fun() ->
{error, stm_send_error, _} = quicer:send(
{error, closed} = quicer:send(
MalformStream, <<1, 2, 3, 4, 5, 6, 7, 8, 9, 0>>
)
end
),
?assert(is_list(emqtt:info(C))),
ok = emqtt:disconnect(C).
@ -770,9 +769,9 @@ t_multi_streams_packet_too_large(Config) ->
timeout = recv_pub(1),
?assert(is_list(emqtt:info(C))),
%% Connection could be kept
{error, stm_send_error, _} = quicer:send(via_stream(PubVia), <<1>>),
{error, stm_send_error, _} = quicer:send(via_stream(PubVia2), <<1>>),
%% Connection could be kept but data stream are closed!
{error, closed} = quicer:send(via_stream(PubVia), <<1>>),
{error, closed} = quicer:send(via_stream(PubVia2), <<1>>),
%% We could send data over new stream
{ok, PubVia3} = emqtt:start_data_stream(C, []),
ok = emqtt:publish_async(

View File

@ -64,12 +64,8 @@ refs(_) ->
expected => "single | cluster | sentinel"
}).
fields(redis_single) ->
common_fields() ++ emqx_redis:fields(single);
fields(redis_cluster) ->
common_fields() ++ emqx_redis:fields(cluster);
fields(redis_sentinel) ->
common_fields() ++ emqx_redis:fields(sentinel).
fields(Type) ->
common_fields() ++ emqx_redis:fields(Type).
desc(redis_single) ->
?DESC(single);

View File

@ -34,17 +34,9 @@ namespace() -> "authz".
type() -> ?AUTHZ_TYPE.
fields(redis_single) ->
fields(Type) ->
emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
emqx_redis:fields(single) ++
[{cmd, cmd()}];
fields(redis_sentinel) ->
emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
emqx_redis:fields(sentinel) ++
[{cmd, cmd()}];
fields(redis_cluster) ->
emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
emqx_redis:fields(cluster) ++
emqx_redis:fields(Type) ++
[{cmd, cmd()}].
desc(redis_single) ->

View File

@ -81,7 +81,8 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_mongodb_action_info,
emqx_bridge_pgsql_action_info,
emqx_bridge_syskeeper_action_info,
emqx_bridge_timescale_action_info
emqx_bridge_timescale_action_info,
emqx_bridge_redis_action_info
].
-else.
hard_coded_action_info_modules_ee() ->

View File

@ -242,17 +242,17 @@ schema_homogeneous_test() ->
is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
Fields = Module:fields(TypeName),
ExpectedFieldNames = lists:map(fun binary_to_atom/1, top_level_common_action_keys()),
MissingFileds = lists:filter(
MissingFields = lists:filter(
fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames
),
case MissingFileds of
case MissingFields of
[] ->
false;
_ ->
{true, #{
schema_module => Module,
type_name => TypeName,
missing_fields => MissingFileds
missing_fields => MissingFields
}}
end.

View File

@ -9,7 +9,9 @@
emqx_resource,
emqx_redis
]},
{env, []},
{env, [
{emqx_action_info_modules, [emqx_bridge_redis_action_info]}
]},
{modules, []},
{links, []}
]}.

View File

@ -8,9 +8,9 @@
-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
-export([
conn_bridge_examples/1
]).
-export([conn_bridge_examples/1]).
-export([type_name_fields/1, connector_fields/1]).
-export([
namespace/0,
@ -100,6 +100,8 @@ namespace() -> "bridge_redis".
roots() -> [].
fields(action_parameters) ->
[{command_template, fun command_template/1}];
fields("post_single") ->
method_fields(post, redis_single);
fields("post_sentinel") ->
@ -142,21 +144,13 @@ method_fields(put, ConnectorType) ->
redis_bridge_common_fields(Type) ->
emqx_bridge_schema:common_bridge_fields() ++
[
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
{command_template, fun command_template/1}
{local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})}
| fields(action_parameters)
] ++
resource_fields(Type).
connector_fields(Type) ->
RedisType = bridge_type_to_redis_conn_type(Type),
emqx_redis:fields(RedisType).
bridge_type_to_redis_conn_type(redis_single) ->
single;
bridge_type_to_redis_conn_type(redis_sentinel) ->
sentinel;
bridge_type_to_redis_conn_type(redis_cluster) ->
cluster.
emqx_redis:fields(Type).
type_name_fields(Type) ->
[
@ -168,7 +162,7 @@ resource_fields(Type) ->
[
{resource_opts,
mk(
ref("creation_opts_" ++ atom_to_list(Type)),
?R_REF("creation_opts_" ++ atom_to_list(Type)),
#{
required => false,
default => #{},
@ -185,6 +179,8 @@ resource_creation_fields("redis_cluster") ->
resource_creation_fields(_) ->
emqx_resource_schema:fields("creation_opts").
desc(action_parameters) ->
?DESC("desc_action_parameters");
desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->

View File

@ -0,0 +1,98 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_redis_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0,
bridge_v1_config_to_action_config/2,
connector_action_config_to_bridge_v1_config/2,
bridge_v1_config_to_connector_config/1,
bridge_v1_type_name_fun/1
]).
-import(emqx_utils_conv, [bin/1]).
-define(SCHEMA_MODULE, emqx_bridge_redis_schema).
-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
action_type_name() -> redis.
connector_type_name() -> redis.
schema_module() -> ?SCHEMA_MODULE.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
fix_v1_type(
maps:merge(
maps:without(
[<<"connector">>],
map_unindent(<<"parameters">>, ActionConfig)
),
map_unindent(<<"parameters">>, ConnectorConfig)
)
).
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),
ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
ActionConfig#{<<"connector">> => ConnectorName}.
bridge_v1_config_to_connector_config(BridgeV1Config) ->
ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),
ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ConnectorTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields("config_connector")),
%% Need put redis_type into parameter.
%% cluster need type to filter resource_opts
ConnectorKeys =
(maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys)) ++
[<<"redis_type">>],
ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys,
make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config).
%%------------------------------------------------------------------------------------------
%% Internal helper fns
%%------------------------------------------------------------------------------------------
bridge_v1_type_name() ->
{fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}.
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"redis_type">> := Type}}, _}) ->
v1_type(Type).
fix_v1_type(#{<<"redis_type">> := RedisType} = Conf) ->
Conf#{<<"type">> => v1_type(RedisType)}.
v1_type(<<"single">>) -> redis_single;
v1_type(<<"sentinel">>) -> redis_sentinel;
v1_type(<<"cluster">>) -> redis_cluster.
bridge_v1_type_names() -> [redis_single, redis_sentinel, redis_cluster].
map_unindent(Key, Map) ->
maps:merge(
maps:get(Key, Map),
maps:remove(Key, Map)
).
map_indent(IndentKey, PickKeys, Map) ->
maps:put(
IndentKey,
maps:with(PickKeys, Map),
maps:without(PickKeys, Map)
).
schema_keys(Schema) ->
[bin(Key) || {Key, _} <- Schema].
make_config_map(PickKeys, IndentKeys, Config) ->
Conf0 = maps:with(PickKeys, Config),
map_indent(<<"parameters">>, IndentKeys, Conf0).

View File

@ -4,6 +4,7 @@
-module(emqx_bridge_redis_connector).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(emqx_resource).
@ -11,11 +12,15 @@
%% callbacks of behaviour emqx_resource
-export([
callback_mode/0,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_start/2,
on_stop/2,
on_query/3,
on_batch_query/3,
on_get_status/2
on_get_status/2,
on_get_channel_status/3
]).
%% -------------------------------------------------------------------------------------------------
@ -24,7 +29,34 @@
callback_mode() -> always_sync.
on_start(InstId, #{command_template := CommandTemplate} = Config) ->
on_add_channel(
_InstanceId,
State = #{channels := Channels},
ChannelId,
#{
parameters := #{
command_template := Template
}
}
) ->
Channels2 = Channels#{
ChannelId => #{template => preproc_command_template(Template)}
},
{ok, State#{channels => Channels2}}.
on_remove_channel(_InstanceId, State = #{channels := Channels}, ChannelId) ->
{ok, State#{channels => maps:remove(ChannelId, Channels)}}.
on_get_channels(InstanceId) ->
emqx_bridge_v2:get_channels_for_connector(InstanceId).
on_get_channel_status(_ConnectorResId, ChannelId, #{channels := Channels}) ->
case maps:is_key(ChannelId, Channels) of
true -> ?status_connected;
false -> ?status_disconnected
end.
on_start(InstId, Config) ->
case emqx_redis:on_start(InstId, Config) of
{ok, RedisConnSt} ->
?tp(
@ -33,7 +65,7 @@ on_start(InstId, #{command_template := CommandTemplate} = Config) ->
),
{ok, #{
conn_st => RedisConnSt,
command_template => preproc_command_template(CommandTemplate)
channels => #{}
}};
{error, {start_pool_failed, _, #{type := authentication_error, reason := Reason}}} = Error ->
?tp(
@ -57,14 +89,8 @@ on_stop(InstId, undefined = _State) ->
on_get_status(InstId, #{conn_st := RedisConnSt}) ->
emqx_redis:on_get_status(InstId, RedisConnSt).
on_query(
InstId,
{send_message, Data},
_State = #{
command_template := CommandTemplate, conn_st := RedisConnSt
}
) ->
Cmd = proc_command_template(CommandTemplate, Data),
%% raw cmd without template, for CI test
on_query(InstId, {cmd, Cmd}, #{conn_st := RedisConnSt}) ->
?tp(
redis_bridge_connector_cmd,
#{cmd => Cmd, batch => false, mode => sync}
@ -77,24 +103,30 @@ on_query(
Result;
on_query(
InstId,
Query,
_State = #{conn_st := RedisConnSt}
{_MessageTag, _Data} = Msg,
#{channels := Channels, conn_st := RedisConnSt}
) ->
case try_render_message([Msg], Channels) of
{ok, [Cmd]} ->
?tp(
redis_bridge_connector_query,
#{query => Query, batch => false, mode => sync}
redis_bridge_connector_cmd,
#{cmd => Cmd, batch => false, mode => sync}
),
Result = query(InstId, Query, RedisConnSt),
Result = query(InstId, {cmd, Cmd}, RedisConnSt),
?tp(
redis_bridge_connector_send_done,
#{query => Query, batch => false, mode => sync, result => Result}
#{cmd => Cmd, batch => false, mode => sync, result => Result}
),
Result.
Result;
Error ->
Error
end.
on_batch_query(
InstId, BatchData, _State = #{command_template := CommandTemplate, conn_st := RedisConnSt}
InstId, BatchData, _State = #{channels := Channels, conn_st := RedisConnSt}
) ->
Cmds = process_batch_data(BatchData, CommandTemplate),
case try_render_message(BatchData, Channels) of
{ok, Cmds} ->
?tp(
redis_bridge_connector_send,
#{batch_data => BatchData, batch => true, mode => sync}
@ -110,12 +142,29 @@ on_batch_query(
result => Result
}
),
Result.
Result;
Error ->
Error
end.
%% -------------------------------------------------------------------------------------------------
%% private helpers
%% -------------------------------------------------------------------------------------------------
try_render_message(Datas, Channels) ->
try_render_message(Datas, Channels, []).
try_render_message([{MessageTag, Data} | T], Channels, Acc) ->
case maps:find(MessageTag, Channels) of
{ok, #{template := Template}} ->
Msg = proc_command_template(Template, Data),
try_render_message(T, Channels, [Msg | Acc]);
_ ->
{error, {unrecoverable_error, {invalid_message_tag, MessageTag}}}
end;
try_render_message([], _Channels, Acc) ->
{ok, lists:reverse(Acc)}.
query(InstId, Query, RedisConnSt) ->
case emqx_redis:on_query(InstId, Query, RedisConnSt) of
{ok, _} = Ok -> Ok;
@ -123,14 +172,6 @@ query(InstId, Query, RedisConnSt) ->
{error, _} = Error -> Error
end.
process_batch_data(BatchData, CommandTemplate) ->
lists:map(
fun({send_message, Data}) ->
proc_command_template(CommandTemplate, Data)
end,
BatchData
).
proc_command_template(CommandTemplate, Msg) ->
lists:map(
fun(ArgTks) ->

View File

@ -0,0 +1,276 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_redis_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-define(TYPE, redis).
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1,
desc/1,
resource_opts_converter/2
]).
%% `emqx_bridge_v2_schema' "unofficial" API
-export([
bridge_v2_examples/1,
conn_bridge_examples/1,
connector_examples/1
]).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
?TYPE.
roots() ->
[].
%%=========================================
%% Action fields
%%=========================================
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
[
{parameters,
?HOCON(
hoconsc:union([
?R_REF(emqx_redis, redis_single_connector),
?R_REF(emqx_redis, redis_sentinel_connector),
?R_REF(emqx_redis, redis_cluster_connector)
]),
#{required => true, desc => ?DESC(redis_parameters)}
)}
] ++
emqx_redis:redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(action) ->
{?TYPE,
?HOCON(
?MAP(name, ?R_REF(redis_action)),
#{
desc => <<"Redis Action Config">>,
converter => fun ?MODULE:resource_opts_converter/2,
required => false
}
)};
fields(redis_action) ->
Schema =
emqx_bridge_v2_schema:make_producer_action_schema(
?HOCON(
?R_REF(emqx_bridge_redis, action_parameters),
#{
required => true,
desc => ?DESC(producer_action)
}
)
),
ResOpts =
{resource_opts,
?HOCON(
?R_REF(resource_opts),
#{
required => true,
desc => ?DESC(emqx_resource_schema, resource_opts)
}
)},
RedisType =
{redis_type,
?HOCON(
?ENUM([single, sentinel, cluster]),
#{required => true, desc => ?DESC(redis_type)}
)},
[RedisType | lists:keyreplace(resource_opts, 1, Schema, ResOpts)];
fields(resource_opts) ->
emqx_resource_schema:create_opts([
{batch_size, #{desc => ?DESC(batch_size)}},
{batch_time, #{desc => ?DESC(batch_time)}}
]);
%%=========================================
%% HTTP API fields
%%=========================================
fields("post_connector") ->
emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("config_connector");
fields("put_connector") ->
fields("config_connector");
fields("get_connector") ->
emqx_bridge_schema:status_fields() ++
fields("post_connector");
fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2");
fields("post_bridge_v2") ->
emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("put_bridge_v2");
fields("put_bridge_v2") ->
fields(redis_action);
fields("get_single") ->
emqx_bridge_schema:status_fields() ++ fields("put_single");
fields("put_single") ->
fields("config_connector");
fields("post_single") ->
emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("put_single").
desc("config_connector") ->
?DESC(emqx_bridge_redis, "desc_config");
desc(redis_action) ->
?DESC(redis_action);
desc(resource_opts) ->
?DESC(emqx_resource_schema, resource_opts);
desc(_Name) ->
undefined.
resource_opts_converter(undefined, _Opts) ->
undefined;
resource_opts_converter(Conf, _Opts) ->
maps:map(
fun(_Name, SubConf) ->
case SubConf of
#{<<"redis_type">> := <<"cluster">>} ->
ResOpts = maps:get(<<"resource_opts">>, SubConf, #{}),
%% cluster don't support batch
SubConf#{
<<"resource_opts">> =>
ResOpts#{<<"batch_size">> => 1, <<"batch_time">> => <<"0ms">>}
};
_ ->
SubConf
end
end,
Conf
).
%%-------------------------------------------------------------------------------------------------
%% `emqx_bridge_v2_schema' "unofficial" API
%%-------------------------------------------------------------------------------------------------
bridge_v2_examples(Method) ->
[
#{
<<"redis_single_producer">> => #{
summary => <<"Redis Single Producer Action">>,
value => action_example(single, Method)
}
},
#{
<<"redis_sentinel_producer">> => #{
summary => <<"Redis Sentinel Producer Action">>,
value => action_example(sentinel, Method)
}
},
#{
<<"redis_cluster_producer">> => #{
summary => <<"Redis Cluster Producer Action">>,
value => action_example(cluster, Method)
}
}
].
connector_examples(Method) ->
[
#{
<<"redis_single_producer">> => #{
summary => <<"Redis Single Producer Connector">>,
value => connector_example(single, Method)
}
},
#{
<<"redis_cluster_producer">> => #{
summary => <<"Redis Cluster Producer Connector">>,
value => connector_example(cluster, Method)
}
},
#{
<<"redis_sentinel_producer">> => #{
summary => <<"Redis Sentinel Producer Connector">>,
value => connector_example(sentinel, Method)
}
}
].
conn_bridge_examples(Method) ->
emqx_bridge_redis:conn_bridge_examples(Method).
action_example(RedisType, post) ->
maps:merge(
action_example(RedisType, put),
#{
type => <<"redis">>,
name => <<"my_action">>
}
);
action_example(RedisType, get) ->
maps:merge(
action_example(RedisType, put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
action_example(RedisType, put) ->
#{
redis_type => RedisType,
enable => true,
connector => <<"my_connector_name">>,
description => <<"My action">>,
parameters => #{
command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>]
},
resource_opts => #{batch_size => 1}
}.
connector_example(RedisType, get) ->
maps:merge(
connector_example(RedisType, put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
connector_example(RedisType, post) ->
maps:merge(
connector_example(RedisType, put),
#{
type => <<"redis_single_producer">>,
name => <<"my_connector">>
}
);
connector_example(RedisType, put) ->
#{
enable => true,
desc => <<"My redis ", (atom_to_binary(RedisType))/binary, " connector">>,
parameters => connector_parameter(RedisType),
pool_size => 8,
database => 1,
username => <<"test">>,
password => <<"******">>,
auto_reconnect => true,
ssl => #{enable => false}
}.
connector_parameter(single) ->
#{redis_type => single, server => <<"127.0.0.1:6379">>};
connector_parameter(cluster) ->
#{redis_type => cluster, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>};
connector_parameter(sentinel) ->
#{
redis_type => sentinel,
servers => <<"127.0.0.1:6379,127.0.0.2:6379">>,
sentinel => <<"myredismaster">>
}.

View File

@ -56,6 +56,7 @@
).
all() -> [{group, transports}, {group, rest}].
suite() -> [{timetrap, {minutes, 20}}].
groups() ->
ResourceSpecificTCs = [t_create_delete_bridge],
@ -143,15 +144,19 @@ redis_checks() ->
end.
end_per_suite(_Config) ->
ok = delete_all_bridges(),
ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_resource]),
_ = application:stop(emqx_connector),
ok.
init_per_testcase(_Testcase, Config) ->
init_per_testcase(Testcase, Config0) ->
emqx_logger:set_log_level(debug),
ok = delete_all_rules(),
ok = delete_all_bridges(),
ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = <<(atom_to_binary(Testcase))/binary, UniqueNum/binary>>,
Config = [{bridge_name, Name} | Config0],
case {?config(connector_type, Config), ?config(batch_mode, Config)} of
{undefined, _} ->
Config;
@ -165,7 +170,13 @@ init_per_testcase(_Testcase, Config) ->
IsBatch = (BatchMode =:= batch_on),
BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS),
BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig},
[{bridge_config, BridgeConfig1}, {is_batch, IsBatch} | Config]
BridgeType = list_to_atom(atom_to_list(RedisType) ++ "_producer"),
[
{bridge_type, BridgeType},
{bridge_config, BridgeConfig1},
{is_batch, IsBatch}
| Config
]
end.
end_per_testcase(_Testcase, Config) ->
@ -173,10 +184,18 @@ end_per_testcase(_Testcase, Config) ->
ProxyPort = ?config(proxy_port, Config),
ok = snabbkaffe:stop(),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ok = delete_all_bridges().
ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors().
t_create_delete_bridge(Config) ->
Name = <<"mybridge">>,
Pid = erlang:whereis(eredis_sentinel),
ct:pal("t_create_detele_bridge:~p~n", [
#{
config => Config,
sentinel => Pid,
eredis_sentinel => Pid =/= undefined andalso erlang:process_info(Pid)
}
]),
Name = ?config(bridge_name, Config),
Type = ?config(connector_type, Config),
BridgeConfig = ?config(bridge_config, Config),
IsBatch = ?config(is_batch, Config),
@ -184,13 +203,11 @@ t_create_delete_bridge(Config) ->
{ok, _},
emqx_bridge:create(Type, Name, BridgeConfig)
),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
?WAIT(
{ok, connected},
emqx_resource:health_check(ResourceId),
5
10
),
RedisType = atom_to_binary(Type),
@ -244,7 +261,7 @@ t_check_values(_Config) ->
).
t_check_replay(Config) ->
Name = <<"toxic_bridge">>,
Name = ?config(bridge_name, Config),
Type = <<"redis_single">>,
Topic = <<"local_topic/test">>,
ProxyName = "redis_single_tcp",
@ -324,15 +341,15 @@ t_permanent_error(_Config) ->
),
ok = emqx_bridge:remove(Type, Name).
t_auth_username_password(_Config) ->
Name = <<"mybridge">>,
t_auth_username_password(Config) ->
Name = ?config(bridge_name, Config),
Type = <<"redis_single">>,
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
BridgeConfig = username_password_redis_bridge_config(),
?assertMatch(
{ok, _},
emqx_bridge:create(Type, Name, BridgeConfig)
),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
?WAIT(
{ok, connected},
emqx_resource:health_check(ResourceId),
@ -340,16 +357,16 @@ t_auth_username_password(_Config) ->
),
ok = emqx_bridge:remove(Type, Name).
t_auth_error_username_password(_Config) ->
Name = <<"mybridge">>,
t_auth_error_username_password(Config) ->
Name = ?config(bridge_name, Config),
Type = <<"redis_single">>,
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
BridgeConfig0 = username_password_redis_bridge_config(),
BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}),
?assertMatch(
{ok, _},
emqx_bridge:create(Type, Name, BridgeConfig)
),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
?WAIT(
{ok, disconnected},
emqx_resource:health_check(ResourceId),
@ -361,16 +378,16 @@ t_auth_error_username_password(_Config) ->
),
ok = emqx_bridge:remove(Type, Name).
t_auth_error_password_only(_Config) ->
Name = <<"mybridge">>,
t_auth_error_password_only(Config) ->
Name = ?config(bridge_name, Config),
Type = <<"redis_single">>,
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
BridgeConfig0 = toxiproxy_redis_bridge_config(),
BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}),
?assertMatch(
{ok, _},
emqx_bridge:create(Type, Name, BridgeConfig)
),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
?assertEqual(
{ok, disconnected},
emqx_resource:health_check(ResourceId)
@ -382,7 +399,7 @@ t_auth_error_password_only(_Config) ->
ok = emqx_bridge:remove(Type, Name).
t_create_disconnected(Config) ->
Name = <<"toxic_bridge">>,
Name = ?config(bridge_name, Config),
Type = <<"redis_single">>,
?check_trace(
@ -450,10 +467,8 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
added_msgs(ResourceId, BaseTopic, Payload) ->
lists:flatmap(
fun(K) ->
{ok, Results} = emqx_resource:simple_sync_query(
ResourceId,
{cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]}
),
Message = {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]},
{ok, Results} = emqx_resource:simple_sync_query(ResourceId, Message),
[El || El <- Results, El =:= Payload]
end,
[format_redis_key(BaseTopic, S) || S <- lists:seq(0, ?KEYSHARDS - 1)]
@ -482,14 +497,6 @@ delete_all_rules() ->
emqx_rule_engine:get_rules()
).
delete_all_bridges() ->
lists:foreach(
fun(#{name := Name, type := Type}) ->
emqx_bridge:remove(Type, Name)
end,
emqx_bridge:list()
).
all_test_hosts() ->
Confs = [
?REDIS_TOXYPROXY_CONNECT_CONFIG
@ -554,12 +561,12 @@ redis_connect_configs() ->
tcp => #{
<<"servers">> => <<"redis-sentinel:26379">>,
<<"redis_type">> => <<"sentinel">>,
<<"sentinel">> => <<"mymaster">>
<<"sentinel">> => <<"mytcpmaster">>
},
tls => #{
<<"servers">> => <<"redis-sentinel-tls:26380">>,
<<"redis_type">> => <<"sentinel">>,
<<"sentinel">> => <<"mymaster">>,
<<"sentinel">> => <<"mytlsmaster">>,
<<"ssl">> => redis_connect_ssl_opts(redis_sentinel)
}
},

View File

@ -9,6 +9,9 @@
stdlib,
ecpool,
emqx_resource,
eredis,
%% eredis_cluster has supervisor should be started before emqx_connector
%% otherwise the first start redis_cluster will fail.
eredis_cluster,
ehttpc,
jose,

View File

@ -42,6 +42,8 @@ resource_type(syskeeper_proxy) ->
emqx_bridge_syskeeper_proxy_server;
resource_type(timescale) ->
emqx_postgresql;
resource_type(redis) ->
emqx_bridge_redis_connector;
resource_type(Type) ->
error({unknown_connector_type, Type}).
@ -139,6 +141,14 @@ connector_structs() ->
desc => <<"Matrix Connector Config">>,
required => false
}
)},
{redis,
mk(
hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")),
#{
desc => <<"Redis Connector Config">>,
required => false
}
)}
].
@ -153,7 +163,8 @@ schema_modules() ->
emqx_bridge_syskeeper_connector,
emqx_bridge_syskeeper_proxy,
emqx_bridge_timescale,
emqx_postgresql_connector_schema
emqx_postgresql_connector_schema,
emqx_bridge_redis_schema
].
api_schemas(Method) ->
@ -177,7 +188,8 @@ api_schemas(Method) ->
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector")
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector")
].
api_ref(Module, Type, Method) ->

View File

@ -103,17 +103,28 @@ schema_modules() ->
[emqx_bridge_http_schema].
-endif.
connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer];
connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer];
connector_type_to_bridge_types(http) -> [http, webhook];
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(matrix) -> [matrix];
connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
connector_type_to_bridge_types(pgsql) -> [pgsql];
connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder];
connector_type_to_bridge_types(syskeeper_proxy) -> [];
connector_type_to_bridge_types(timescale) -> [timescale].
connector_type_to_bridge_types(azure_event_hub_producer) ->
[azure_event_hub_producer];
connector_type_to_bridge_types(confluent_producer) ->
[confluent_producer];
connector_type_to_bridge_types(gcp_pubsub_producer) ->
[gcp_pubsub, gcp_pubsub_producer];
connector_type_to_bridge_types(kafka_producer) ->
[kafka, kafka_producer];
connector_type_to_bridge_types(matrix) ->
[matrix];
connector_type_to_bridge_types(mongodb) ->
[mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
connector_type_to_bridge_types(pgsql) ->
[pgsql];
connector_type_to_bridge_types(syskeeper_forwarder) ->
[syskeeper_forwarder];
connector_type_to_bridge_types(syskeeper_proxy) ->
[];
connector_type_to_bridge_types(timescale) ->
[timescale];
connector_type_to_bridge_types(redis) ->
[redis, redis_single, redis_sentinel, redis_cluster].
actions_config_name() -> <<"actions">>.
@ -158,7 +169,7 @@ split_bridge_to_connector_and_action(
BridgeType, BridgeV1Conf
);
false ->
%% We do an automatic transfomation to get the connector config
%% We do an automatic transformation to get the connector config
%% if the callback is not defined.
%% Get connector fields from bridge config
lists:foldl(

View File

@ -844,7 +844,7 @@ parse_object_loop(PropList0, Module, Options) ->
),
parse_object_loop(PropList, Module, Options, _Props = [], _Required = [], _Refs = []).
parse_object_loop([], _Modlue, _Options, Props, Required, Refs) ->
parse_object_loop([], _Module, _Options, Props, Required, Refs) ->
{lists:reverse(Props), lists:usort(Required), Refs};
parse_object_loop([{Name, Hocon} | Rest], Module, Options, Props, Required, Refs) ->
NameBin = to_bin(Name),

View File

@ -204,7 +204,7 @@ data_export(post, _Request) ->
data_import(post, #{body := #{<<"filename">> := FileName} = Body}) ->
case safe_parse_node(Body) of
{error, Msg} ->
{400, #{code => 'BAD_REQUEST', message => Msg}};
{400, #{code => ?BAD_REQUEST, message => Msg}};
FileNode ->
CoreNode = core_node(FileNode),
response(
@ -231,20 +231,23 @@ data_files(post, #{body := #{<<"filename">> := #{type := _} = File}}) ->
ok ->
{204};
{error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => emqx_mgmt_data_backup:format_error(Reason)}}
{400, #{code => ?BAD_REQUEST, message => emqx_mgmt_data_backup:format_error(Reason)}}
end;
data_files(post, #{body := _}) ->
{400, #{code => ?BAD_REQUEST, message => "Missing required parameter: filename"}};
data_files(get, #{query_string := PageParams}) ->
case emqx_mgmt_api:parse_pager_params(PageParams) of
false ->
{400, #{code => ?BAD_REQUEST, message => <<"page_limit_invalid">>}};
#{page := Page, limit := Limit} = Pager ->
{200, #{data => list_backup_files(Page, Limit), meta => Pager}}
{Count, HasNext, Data} = list_backup_files(Page, Limit),
{200, #{data => Data, meta => Pager#{count => Count, hasnext => HasNext}}}
end.
data_file_by_name(Method, #{bindings := #{filename := Filename}, query_string := QS}) ->
case safe_parse_node(QS) of
{error, Msg} ->
{400, #{code => 'BAD_REQUEST', message => Msg}};
{400, #{code => ?BAD_REQUEST, message => Msg}};
Node ->
case get_or_delete_file(Method, Filename, Node) of
{error, not_found} ->
@ -293,7 +296,10 @@ response({error, Reason}) ->
list_backup_files(Page, Limit) ->
Start = Page * Limit - Limit + 1,
lists:sublist(list_backup_files(), Start, Limit).
AllFiles = list_backup_files(),
Count = length(AllFiles),
HasNext = Start + Limit - 1 < Count,
{Count, HasNext, lists:sublist(AllFiles, Start, Limit)}.
list_backup_files() ->
Nodes = emqx:running_nodes(),

View File

@ -80,22 +80,32 @@ t_list_backups(Config) ->
[{ok, _} = export_backup(?NODE2_PORT, Auth) || _ <- lists:seq(1, 10)],
{ok, RespBody} = list_backups(?NODE1_PORT, Auth, <<"1">>, <<"100">>),
#{<<"data">> := Data, <<"meta">> := _} = emqx_utils_json:decode(RespBody),
#{<<"data">> := Data, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode(
RespBody
),
?assertEqual(20, length(Data)),
{ok, EmptyRespBody} = list_backups(?NODE2_PORT, Auth, <<"2">>, <<"100">>),
#{<<"data">> := EmptyData, <<"meta">> := _} = emqx_utils_json:decode(EmptyRespBody),
#{<<"data">> := EmptyData, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode(
EmptyRespBody
),
?assertEqual(0, length(EmptyData)),
{ok, RespBodyP1} = list_backups(?NODE3_PORT, Auth, <<"1">>, <<"10">>),
{ok, RespBodyP2} = list_backups(?NODE3_PORT, Auth, <<"2">>, <<"10">>),
{ok, RespBodyP3} = list_backups(?NODE3_PORT, Auth, <<"3">>, <<"10">>),
#{<<"data">> := DataP1, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP1),
#{<<"data">> := DataP1, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := true}} = emqx_utils_json:decode(
RespBodyP1
),
?assertEqual(10, length(DataP1)),
#{<<"data">> := DataP2, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP2),
#{<<"data">> := DataP2, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode(
RespBodyP2
),
?assertEqual(10, length(DataP2)),
#{<<"data">> := DataP3, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP3),
#{<<"data">> := DataP3, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode(
RespBodyP3
),
?assertEqual(0, length(DataP3)),
?assertEqual(Data, DataP1 ++ DataP2).

View File

@ -20,7 +20,7 @@
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-export([roots/0, fields/1]).
-export([roots/0, fields/1, redis_fields/0, desc/1]).
-behaviour(emqx_resource).
@ -50,55 +50,46 @@ roots() ->
{config, #{
type => hoconsc:union(
[
hoconsc:ref(?MODULE, cluster),
hoconsc:ref(?MODULE, single),
hoconsc:ref(?MODULE, sentinel)
?R_REF(redis_cluster),
?R_REF(redis_single),
?R_REF(redis_sentinel)
]
)
}}
].
fields(single) ->
[
{server, server()},
{redis_type, #{
type => single,
default => single,
required => false,
desc => ?DESC("single")
}}
] ++
fields(redis_single) ->
fields(redis_single_connector) ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(cluster) ->
fields(redis_single_connector) ->
[
{servers, servers()},
{redis_type, #{
type => cluster,
default => cluster,
required => false,
desc => ?DESC("cluster")
}}
] ++
{server, server()},
redis_type(single)
];
fields(redis_cluster) ->
fields(redis_cluster_connector) ++
lists:keydelete(database, 1, redis_fields()) ++
emqx_connector_schema_lib:ssl_fields();
fields(sentinel) ->
fields(redis_cluster_connector) ->
[
{servers, servers()},
{redis_type, #{
type => sentinel,
default => sentinel,
required => false,
desc => ?DESC("sentinel")
}},
redis_type(cluster)
];
fields(redis_sentinel) ->
fields(redis_sentinel_connector) ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(redis_sentinel_connector) ->
[
{servers, servers()},
redis_type(sentinel),
{sentinel, #{
type => string(),
required => true,
desc => ?DESC("sentinel_desc")
}}
] ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields().
].
server() ->
Meta = #{desc => ?DESC("server")},
@ -108,64 +99,52 @@ servers() ->
Meta = #{desc => ?DESC("servers")},
emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS).
desc(redis_cluster_connector) ->
?DESC(redis_cluster_connector);
desc(redis_single_connector) ->
?DESC(redis_single_connector);
desc(redis_sentinel_connector) ->
?DESC(redis_sentinel_connector);
desc(_) ->
undefined.
%% ===================================================================
redis_type(Type) ->
{redis_type, #{
type => Type,
default => Type,
required => false,
desc => ?DESC(Type)
}}.
callback_mode() -> always_sync.
on_start(
InstId,
#{
redis_type := Type,
pool_size := PoolSize,
ssl := SSL
} = Config
) ->
on_start(InstId, Config0) ->
?SLOG(info, #{
msg => "starting_redis_connector",
connector => InstId,
config => emqx_utils:redact(Config)
config => emqx_utils:redact(Config0)
}),
ConfKey =
case Type of
single -> server;
_ -> servers
end,
Servers0 = maps:get(ConfKey, Config),
Servers1 = lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS)
),
Servers = [{servers, Servers1}],
Database =
case Type of
cluster -> [];
_ -> [{database, maps:get(database, Config)}]
end,
Config = config(Config0),
#{pool_size := PoolSize, ssl := SSL, redis_type := Type} = Config,
Options = ssl_options(SSL) ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
Opts =
[
{pool_size, PoolSize},
{username, maps:get(username, Config, undefined)},
{password, maps:get(password, Config, "")},
{servers, servers(Config)},
{options, Options},
{pool_size, PoolSize},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
] ++ Database ++ Servers,
Options =
case maps:get(enable, SSL) of
true ->
[
{ssl, true},
{ssl_options, emqx_tls_lib:to_client_opts(SSL)}
];
false ->
[{ssl, false}]
end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
] ++ database(Config),
State = #{pool_name => InstId, type => Type},
ok = emqx_resource:allocate_resource(InstId, type, Type),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case Type of
cluster ->
case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of
case eredis_cluster:start_pool(InstId, Opts) of
{ok, _} ->
{ok, State};
{ok, _, _} ->
@ -174,7 +153,7 @@ on_start(
{error, Reason}
end;
_ ->
case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ [{options, Options}]) of
case emqx_resource_pool:start(InstId, ?MODULE, Opts) of
ok ->
{ok, State};
{error, Reason} ->
@ -182,6 +161,14 @@ on_start(
end
end.
ssl_options(SSL = #{enable := true}) ->
[
{ssl, true},
{ssl_options, emqx_tls_lib:to_client_opts(SSL)}
];
ssl_options(#{enable := false}) ->
[{ssl, false}].
on_stop(InstId, _State) ->
?SLOG(info, #{
msg => "stopping_redis_connector",
@ -189,7 +176,11 @@ on_stop(InstId, _State) ->
}),
case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName, type := cluster} ->
eredis_cluster:stop_pool(PoolName);
case eredis_cluster:stop_pool(PoolName) of
{error, not_found} -> ok;
ok -> ok;
Error -> Error
end;
#{pool_name := PoolName, type := _} ->
emqx_resource_pool:stop(PoolName);
_ ->
@ -244,8 +235,17 @@ is_unrecoverable_error(_) ->
on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
case eredis_cluster:pool_exists(PoolName) of
true ->
%% eredis_cluster has null slot even pool_exists when emqx start before redis cluster.
%% we need restart eredis_cluster pool when pool_worker(slot) is empty.
%% If the pool is empty, it means that there are no workers attempting to reconnect.
%% In this case, we can directly consider it as a disconnect and then proceed to reconnect.
case eredis_cluster_monitor:get_all_pools(PoolName) of
[] ->
disconnected;
[_ | _] ->
Health = eredis_cluster:ping_all(PoolName),
status_result(Health);
status_result(Health)
end;
false ->
disconnected
end;
@ -289,6 +289,28 @@ wrap_qp_result(Results) when is_list(Results) ->
end.
%% ===================================================================
%% parameters for connector
config(#{parameters := #{} = Param} = Config) ->
maps:merge(maps:remove(parameters, Config), Param);
%% is for authn/authz
config(Config) ->
Config.
servers(#{server := Server}) ->
servers(Server);
servers(#{servers := Servers}) ->
servers(Servers);
servers(Servers) ->
lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(Servers, ?REDIS_HOST_OPTIONS)
).
database(#{redis_type := cluster}) -> [];
database(#{database := Database}) -> [{database, Database}].
connect(Opts) ->
eredis:start_link(Opts).

View File

@ -223,7 +223,7 @@ redis_config_base(Type, ServerKey) ->
"sentinel" ->
Host = ?REDIS_SENTINEL_HOST,
Port = ?REDIS_SENTINEL_PORT,
MaybeSentinel = " sentinel = mymaster\n",
MaybeSentinel = " sentinel = mytcpmaster\n",
MaybeDatabase = " database = 1\n";
"single" ->
Host = ?REDIS_SINGLE_HOST,

View File

@ -13,7 +13,11 @@
uuid,
emqx,
emqx_utils,
emqx_ctl
emqx_ctl,
%% rule_engine should wait for bridge connector start,
%% it's will check action/connector ref's exist.
emqx_bridge,
emqx_connector
]},
{mod, {emqx_rule_engine_app, []}},
{env, []},

View File

@ -0,0 +1,2 @@
Upgrade QUIC stack, more features on the way!

View File

@ -64,7 +64,7 @@ defmodule EMQXUmbrella.MixProject do
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
# maybe forbid to fetch quicer
{:emqtt,
github: "emqx/emqtt", tag: "1.9.6", override: true, system_env: maybe_no_quic_env()},
github: "emqx/emqtt", tag: "1.9.7", override: true, system_env: maybe_no_quic_env()},
{:rulesql, github: "emqx/rulesql", tag: "0.1.7"},
{:observer_cli, "1.7.1"},
{:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"},
@ -830,7 +830,7 @@ defmodule EMQXUmbrella.MixProject do
defp quicer_dep() do
if enable_quicer?(),
# in conflict with emqx and emqtt
do: [{:quicer, github: "emqx/quic", tag: "0.0.202", override: true}],
do: [{:quicer, github: "emqx/quic", tag: "0.0.303", override: true}],
else: []
end

View File

@ -69,7 +69,7 @@
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}}
, {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.6"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.7"}}}
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.7"}}}
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}

View File

@ -39,7 +39,7 @@ bcrypt() ->
{bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.1"}}}.
quicer() ->
{quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.202"}}}.
{quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.303"}}}.
jq() ->
{jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.12"}}}.

View File

@ -32,14 +32,19 @@ desc_type.desc:
desc_type.label:
"""Bridge Type"""
local_topic.desc:
desc_local_topic.desc:
"""The MQTT topic filter to be forwarded to Redis. All MQTT 'PUBLISH' messages with the topic
matching the local_topic will be forwarded.<br/>
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
configured, then both the data got from the rule and the MQTT messages that match local_topic
will be forwarded."""
local_topic.label:
desc_local_topic.label:
"""Local Topic"""
desc_action_parameters.desc:
"""The parameters of the action."""
desc_action_parameters.label:
"""Action Parameters"""
}

View File

@ -0,0 +1,41 @@
emqx_bridge_redis_schema {
redis_parameters.label:
"""Redis Type Specific Parameters"""
redis_parameters.desc:
"""Set of parameters specific for the given type of this Redis connector, `redis_type` can be one of `single`, `cluster` or `sentinel`."""
producer_action.desc:
"""The parameters of the action."""
producer_action.label:
"""Action Parameters"""
redis_type.label:
"""Redis Type"""
redis_type.desc:
"""Single mode. Must be set to 'single' when Redis server is running in single mode.
Sentinel mode. Must be set to 'sentinel' when Redis server is running in sentinel mode.
Cluster mode. Must be set to 'cluster' when Redis server is running in clustered mode."""
batch_size.label:
"""Batch Size"""
batch_size.desc:
"""This parameter defines the upper limit of the batch count.
Setting this value to 1 effectively disables batching, as it indicates that only one item will be processed per batch.
Note on Redis Cluster Mode:
In the context of Redis Cluster Mode, it is important to note that batching is not supported.
Consequently, the batch_size is always set to 1,
reflecting the mode inherent limitation in handling batch operations."""
batch_time.desc:
"""Maximum waiting interval when accumulating a batch at a low message rates for more efficient resource usage."""
batch_time.label:
"""Max batch wait time, disable when in Redis Cluster Mode."""
redis_action.label:
"""Redis Action"""
redis_action.desc:
"""Action to interact with a Redis connector."""
}

View File

@ -1,5 +1,11 @@
emqx_connector_schema {
config_enable.desc:
"""Enable (true) or disable (false) this connector."""
config_enable.label:
"""Enable or Disable"""
desc_connectors.desc:
"""Connectors that are used to connect to external systems"""
desc_connectors.label:

View File

@ -47,4 +47,18 @@ single.desc:
single.label:
"""Single Mode"""
redis_cluster_connector.label:
"""Redis Cluster Connector"""
redis_cluster_connector.desc:
"""Redis connector in cluster mode"""
redis_sentinel_connector.label:
"""Redis Sentinel Connector"""
redis_sentinel_connector.desc:
"""Redis connector in sentinel mode"""
redis_single_connector.label:
"""Redis Single Connector"""
redis_single_connector.desc:
"""Redis connector in sentinel mode"""
}

View File

@ -27,11 +27,21 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
I18N_REPO_BRANCH="v$(./pkg-vsn.sh "${PROFILE_STR}" | tr -d '.' | cut -c 1-2)"
DOWNLOAD_I18N_TRANSLATIONS=${DOWNLOAD_I18N_TRANSLATIONS:-true}
# download desc (i18n) translations
if [ "$DOWNLOAD_I18N_TRANSLATIONS" = "true" ]; then
echo "downloading i18n translation from emqx/emqx-i18n"
start=$(date +%s)
curl -L --silent --show-error \
--output "apps/emqx_dashboard/priv/desc.zh.hocon" \
"https://raw.githubusercontent.com/emqx/emqx-i18n/${I18N_REPO_BRANCH}/desc.zh.hocon"
end=$(date +%s)
duration=$(echo "$end $start" | awk '{print $1 - $2}')
echo "downloaded i18n translation in $duration seconds, set DOWNLOAD_I18N_TRANSLATIONS=false to skip"
else
echo "skipping to download i18n translation from emqx/emqx-i18n, set DOWNLOAD_I18N_TRANSLATIONS=true to update"
fi
# TODO
# make sbom a build artifcat
# make sbom a build artifact
# ./scripts/update-bom.sh "$PROFILE_STR" ./rel