From 0aec2f760574d9f34757fc4bfaf6e24e2713b104 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 23 Nov 2023 11:00:07 +0800 Subject: [PATCH] feat: redis bridge v2 --- .../redis/sentinel-tcp/sentinel-base.conf | 8 +- .../redis/sentinel-tls/sentinel-base.conf | 8 +- .../src/emqx_authn_redis_schema.erl | 8 +- .../src/emqx_authz_redis_schema.erl | 12 +- apps/emqx_bridge/src/emqx_action_info.erl | 3 +- .../src/schema/emqx_bridge_v2_schema.erl | 6 +- .../src/emqx_bridge_redis.app.src | 6 +- .../src/emqx_bridge_redis.erl | 26 +- .../src/emqx_bridge_redis_action_info.erl | 98 +++++++ .../src/emqx_bridge_redis_connector.erl | 139 +++++---- .../src/emqx_bridge_redis_schema.erl | 276 ++++++++++++++++++ .../test/emqx_bridge_redis_SUITE.erl | 75 ++--- .../emqx_connector/src/emqx_connector.app.src | 3 + .../src/schema/emqx_connector_ee_schema.erl | 16 +- .../src/schema/emqx_connector_schema.erl | 34 ++- .../src/emqx_dashboard_swagger.erl | 2 +- apps/emqx_redis/src/emqx_redis.erl | 176 ++++++----- apps/emqx_redis/test/emqx_redis_SUITE.erl | 2 +- .../src/emqx_rule_engine.app.src | 6 +- rel/i18n/emqx_bridge_redis.hocon | 9 +- rel/i18n/emqx_bridge_redis_schema.hocon | 41 +++ rel/i18n/emqx_connector_schema.hocon | 6 + rel/i18n/emqx_redis.hocon | 14 + scripts/pre-compile.sh | 16 +- 24 files changed, 764 insertions(+), 226 deletions(-) create mode 100644 apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl create mode 100644 apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl create mode 100644 rel/i18n/emqx_bridge_redis_schema.hocon diff --git a/.ci/docker-compose-file/redis/sentinel-tcp/sentinel-base.conf b/.ci/docker-compose-file/redis/sentinel-tcp/sentinel-base.conf index 419f2a935..c43de536b 100644 --- a/.ci/docker-compose-file/redis/sentinel-tcp/sentinel-base.conf +++ b/.ci/docker-compose-file/redis/sentinel-tcp/sentinel-base.conf @@ -1,7 +1,7 @@ sentinel resolve-hostnames yes bind :: 0.0.0.0 -sentinel monitor mymaster redis-sentinel-master 6379 1 -sentinel auth-pass mymaster public -sentinel down-after-milliseconds mymaster 10000 -sentinel failover-timeout mymaster 20000 +sentinel monitor mytcpmaster redis-sentinel-master 6379 1 +sentinel auth-pass mytcpmaster public +sentinel down-after-milliseconds mytcpmaster 10000 +sentinel failover-timeout mytcpmaster 20000 diff --git a/.ci/docker-compose-file/redis/sentinel-tls/sentinel-base.conf b/.ci/docker-compose-file/redis/sentinel-tls/sentinel-base.conf index 8363ae383..7ea32f805 100644 --- a/.ci/docker-compose-file/redis/sentinel-tls/sentinel-base.conf +++ b/.ci/docker-compose-file/redis/sentinel-tls/sentinel-base.conf @@ -8,7 +8,7 @@ tls-key-file /etc/certs/key.pem tls-ca-cert-file /etc/certs/cacert.pem tls-auth-clients no -sentinel monitor mymaster redis-sentinel-tls-master 6389 1 -sentinel auth-pass mymaster public -sentinel down-after-milliseconds mymaster 10000 -sentinel failover-timeout mymaster 20000 +sentinel monitor mytlsmaster redis-sentinel-tls-master 6389 1 +sentinel auth-pass mytlsmaster public +sentinel down-after-milliseconds mytlsmaster 10000 +sentinel failover-timeout mytlsmaster 20000 diff --git a/apps/emqx_auth_redis/src/emqx_authn_redis_schema.erl b/apps/emqx_auth_redis/src/emqx_authn_redis_schema.erl index f3e124ca1..b72905f6b 100644 --- a/apps/emqx_auth_redis/src/emqx_authn_redis_schema.erl +++ b/apps/emqx_auth_redis/src/emqx_authn_redis_schema.erl @@ -64,12 +64,8 @@ refs(_) -> expected => "single | cluster | sentinel" }). -fields(redis_single) -> - common_fields() ++ emqx_redis:fields(single); -fields(redis_cluster) -> - common_fields() ++ emqx_redis:fields(cluster); -fields(redis_sentinel) -> - common_fields() ++ emqx_redis:fields(sentinel). +fields(Type) -> + common_fields() ++ emqx_redis:fields(Type). desc(redis_single) -> ?DESC(single); diff --git a/apps/emqx_auth_redis/src/emqx_authz_redis_schema.erl b/apps/emqx_auth_redis/src/emqx_authz_redis_schema.erl index 5cd084795..96949b0ea 100644 --- a/apps/emqx_auth_redis/src/emqx_authz_redis_schema.erl +++ b/apps/emqx_auth_redis/src/emqx_authz_redis_schema.erl @@ -34,17 +34,9 @@ namespace() -> "authz". type() -> ?AUTHZ_TYPE. -fields(redis_single) -> +fields(Type) -> emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++ - emqx_redis:fields(single) ++ - [{cmd, cmd()}]; -fields(redis_sentinel) -> - emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++ - emqx_redis:fields(sentinel) ++ - [{cmd, cmd()}]; -fields(redis_cluster) -> - emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++ - emqx_redis:fields(cluster) ++ + emqx_redis:fields(Type) ++ [{cmd, cmd()}]. desc(redis_single) -> diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 4f195b417..ddb8424ae 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -81,7 +81,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_mongodb_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, - emqx_bridge_timescale_action_info + emqx_bridge_timescale_action_info, + emqx_bridge_redis_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 188a550fc..2c2dde4da 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -242,17 +242,17 @@ schema_homogeneous_test() -> is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> Fields = Module:fields(TypeName), ExpectedFieldNames = lists:map(fun binary_to_atom/1, top_level_common_action_keys()), - MissingFileds = lists:filter( + MissingFields = lists:filter( fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames ), - case MissingFileds of + case MissingFields of [] -> false; _ -> {true, #{ schema_module => Module, type_name => TypeName, - missing_fields => MissingFileds + missing_fields => MissingFields }} end. diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src index 5b6163969..53130d188 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_redis, [ {description, "EMQX Enterprise Redis Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, @@ -9,7 +9,9 @@ emqx_resource, emqx_redis ]}, - {env, []}, + {env, [ + {emqx_action_info_modules, [emqx_bridge_redis_action_info]} + ]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl index 1c8ee75f9..beafc8775 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl @@ -8,9 +8,9 @@ -import(hoconsc, [mk/2, enum/1, ref/1, ref/2]). --export([ - conn_bridge_examples/1 -]). +-export([conn_bridge_examples/1]). + +-export([type_name_fields/1, connector_fields/1]). -export([ namespace/0, @@ -100,6 +100,8 @@ namespace() -> "bridge_redis". roots() -> []. +fields(action_parameters) -> + [{command_template, fun command_template/1}]; fields("post_single") -> method_fields(post, redis_single); fields("post_sentinel") -> @@ -142,21 +144,13 @@ method_fields(put, ConnectorType) -> redis_bridge_common_fields(Type) -> emqx_bridge_schema:common_bridge_fields() ++ [ - {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, - {command_template, fun command_template/1} + {local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})} + | fields(action_parameters) ] ++ resource_fields(Type). connector_fields(Type) -> - RedisType = bridge_type_to_redis_conn_type(Type), - emqx_redis:fields(RedisType). - -bridge_type_to_redis_conn_type(redis_single) -> - single; -bridge_type_to_redis_conn_type(redis_sentinel) -> - sentinel; -bridge_type_to_redis_conn_type(redis_cluster) -> - cluster. + emqx_redis:fields(Type). type_name_fields(Type) -> [ @@ -168,7 +162,7 @@ resource_fields(Type) -> [ {resource_opts, mk( - ref("creation_opts_" ++ atom_to_list(Type)), + ?R_REF("creation_opts_" ++ atom_to_list(Type)), #{ required => false, default => #{}, @@ -185,6 +179,8 @@ resource_creation_fields("redis_cluster") -> resource_creation_fields(_) -> emqx_resource_schema:fields("creation_opts"). +desc(action_parameters) -> + ?DESC("desc_action_parameters"); desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl new file mode 100644 index 000000000..22ed40093 --- /dev/null +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl @@ -0,0 +1,98 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_redis_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0, + bridge_v1_config_to_action_config/2, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_config_to_connector_config/1, + bridge_v1_type_name_fun/1 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(SCHEMA_MODULE, emqx_bridge_redis_schema). +-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]). + +action_type_name() -> redis. + +connector_type_name() -> redis. + +schema_module() -> ?SCHEMA_MODULE. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + fix_v1_type( + maps:merge( + maps:without( + [<<"connector">>], + map_unindent(<<"parameters">>, ActionConfig) + ), + map_unindent(<<"parameters">>, ConnectorConfig) + ) + ). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)), + ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + ActionConfig#{<<"connector">> => ConnectorName}. + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)), + ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ConnectorTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields("config_connector")), + %% Need put redis_type into parameter. + %% cluster need type to filter resource_opts + ConnectorKeys = + (maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys)) ++ + [<<"redis_type">>], + ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys, + make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config). + +%%------------------------------------------------------------------------------------------ +%% Internal helper fns +%%------------------------------------------------------------------------------------------ + +bridge_v1_type_name() -> + {fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}. +bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"redis_type">> := Type}}, _}) -> + v1_type(Type). + +fix_v1_type(#{<<"redis_type">> := RedisType} = Conf) -> + Conf#{<<"type">> => v1_type(RedisType)}. + +v1_type(<<"single">>) -> redis_single; +v1_type(<<"sentinel">>) -> redis_sentinel; +v1_type(<<"cluster">>) -> redis_cluster. + +bridge_v1_type_names() -> [redis_single, redis_sentinel, redis_cluster]. + +map_unindent(Key, Map) -> + maps:merge( + maps:get(Key, Map), + maps:remove(Key, Map) + ). + +map_indent(IndentKey, PickKeys, Map) -> + maps:put( + IndentKey, + maps:with(PickKeys, Map), + maps:without(PickKeys, Map) + ). + +schema_keys(Schema) -> + [bin(Key) || {Key, _} <- Schema]. + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + map_indent(<<"parameters">>, IndentKeys, Conf0). diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index 696947726..4835e8127 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -4,6 +4,7 @@ -module(emqx_bridge_redis_connector). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -behaviour(emqx_resource). @@ -11,11 +12,15 @@ %% callbacks of behaviour emqx_resource -export([ callback_mode/0, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, on_start/2, on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_get_channel_status/3 ]). %% ------------------------------------------------------------------------------------------------- @@ -24,7 +29,34 @@ callback_mode() -> always_sync. -on_start(InstId, #{command_template := CommandTemplate} = Config) -> +on_add_channel( + _InstanceId, + State = #{channels := Channels}, + ChannelId, + #{ + parameters := #{ + command_template := Template + } + } +) -> + Channels2 = Channels#{ + ChannelId => #{template => preproc_command_template(Template)} + }, + {ok, State#{channels => Channels2}}. + +on_remove_channel(_InstanceId, State = #{channels := Channels}, ChannelId) -> + {ok, State#{channels => maps:remove(ChannelId, Channels)}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_get_channel_status(_ConnectorResId, ChannelId, #{channels := Channels}) -> + case maps:is_key(ChannelId, Channels) of + true -> ?status_connected; + false -> ?status_disconnected + end. + +on_start(InstId, Config) -> case emqx_redis:on_start(InstId, Config) of {ok, RedisConnSt} -> ?tp( @@ -33,7 +65,7 @@ on_start(InstId, #{command_template := CommandTemplate} = Config) -> ), {ok, #{ conn_st => RedisConnSt, - command_template => preproc_command_template(CommandTemplate) + channels => #{} }}; {error, {start_pool_failed, _, #{type := authentication_error, reason := Reason}}} = Error -> ?tp( @@ -57,14 +89,8 @@ on_stop(InstId, undefined = _State) -> on_get_status(InstId, #{conn_st := RedisConnSt}) -> emqx_redis:on_get_status(InstId, RedisConnSt). -on_query( - InstId, - {send_message, Data}, - _State = #{ - command_template := CommandTemplate, conn_st := RedisConnSt - } -) -> - Cmd = proc_command_template(CommandTemplate, Data), +%% raw cmd without template, for CI test +on_query(InstId, {cmd, Cmd}, #{conn_st := RedisConnSt}) -> ?tp( redis_bridge_connector_cmd, #{cmd => Cmd, batch => false, mode => sync} @@ -77,45 +103,68 @@ on_query( Result; on_query( InstId, - Query, - _State = #{conn_st := RedisConnSt} + {_MessageTag, _Data} = Msg, + #{channels := Channels, conn_st := RedisConnSt} ) -> - ?tp( - redis_bridge_connector_query, - #{query => Query, batch => false, mode => sync} - ), - Result = query(InstId, Query, RedisConnSt), - ?tp( - redis_bridge_connector_send_done, - #{query => Query, batch => false, mode => sync, result => Result} - ), - Result. + case try_render_message([Msg], Channels) of + {ok, [Cmd]} -> + ?tp( + redis_bridge_connector_cmd, + #{cmd => Cmd, batch => false, mode => sync} + ), + Result = query(InstId, {cmd, Cmd}, RedisConnSt), + ?tp( + redis_bridge_connector_send_done, + #{cmd => Cmd, batch => false, mode => sync, result => Result} + ), + Result; + Error -> + Error + end. on_batch_query( - InstId, BatchData, _State = #{command_template := CommandTemplate, conn_st := RedisConnSt} + InstId, BatchData, _State = #{channels := Channels, conn_st := RedisConnSt} ) -> - Cmds = process_batch_data(BatchData, CommandTemplate), - ?tp( - redis_bridge_connector_send, - #{batch_data => BatchData, batch => true, mode => sync} - ), - Result = query(InstId, {cmds, Cmds}, RedisConnSt), - ?tp( - redis_bridge_connector_send_done, - #{ - batch_data => BatchData, - batch_size => length(BatchData), - batch => true, - mode => sync, - result => Result - } - ), - Result. + case try_render_message(BatchData, Channels) of + {ok, Cmds} -> + ?tp( + redis_bridge_connector_send, + #{batch_data => BatchData, batch => true, mode => sync} + ), + Result = query(InstId, {cmds, Cmds}, RedisConnSt), + ?tp( + redis_bridge_connector_send_done, + #{ + batch_data => BatchData, + batch_size => length(BatchData), + batch => true, + mode => sync, + result => Result + } + ), + Result; + Error -> + Error + end. %% ------------------------------------------------------------------------------------------------- %% private helpers %% ------------------------------------------------------------------------------------------------- +try_render_message(Datas, Channels) -> + try_render_message(Datas, Channels, []). + +try_render_message([{MessageTag, Data} | T], Channels, Acc) -> + case maps:find(MessageTag, Channels) of + {ok, #{template := Template}} -> + Msg = proc_command_template(Template, Data), + try_render_message(T, Channels, [Msg | Acc]); + _ -> + {error, {unrecoverable_error, {invalid_message_tag, MessageTag}}} + end; +try_render_message([], _Channels, Acc) -> + {ok, lists:reverse(Acc)}. + query(InstId, Query, RedisConnSt) -> case emqx_redis:on_query(InstId, Query, RedisConnSt) of {ok, _} = Ok -> Ok; @@ -123,14 +172,6 @@ query(InstId, Query, RedisConnSt) -> {error, _} = Error -> Error end. -process_batch_data(BatchData, CommandTemplate) -> - lists:map( - fun({send_message, Data}) -> - proc_command_template(CommandTemplate, Data) - end, - BatchData - ). - proc_command_template(CommandTemplate, Msg) -> lists:map( fun(ArgTks) -> diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl new file mode 100644 index 000000000..f02bf3322 --- /dev/null +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl @@ -0,0 +1,276 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_redis_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-define(TYPE, redis). + +%% `hocon_schema' API +-export([ + namespace/0, + roots/0, + fields/1, + desc/1, + resource_opts_converter/2 +]). + +%% `emqx_bridge_v2_schema' "unofficial" API +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 +]). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + ?TYPE. + +roots() -> + []. + +%%========================================= +%% Action fields +%%========================================= +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + [ + {parameters, + ?HOCON( + hoconsc:union([ + ?R_REF(emqx_redis, redis_single_connector), + ?R_REF(emqx_redis, redis_sentinel_connector), + ?R_REF(emqx_redis, redis_cluster_connector) + ]), + #{required => true, desc => ?DESC(redis_parameters)} + )} + ] ++ + emqx_redis:redis_fields() ++ + emqx_connector_schema_lib:ssl_fields(); +fields(action) -> + {?TYPE, + ?HOCON( + ?MAP(name, ?R_REF(redis_action)), + #{ + desc => <<"Redis Action Config">>, + converter => fun ?MODULE:resource_opts_converter/2, + required => false + } + )}; +fields(redis_action) -> + Schema = + emqx_bridge_v2_schema:make_producer_action_schema( + ?HOCON( + ?R_REF(emqx_bridge_redis, action_parameters), + #{ + required => true, + desc => ?DESC(producer_action) + } + ) + ), + ResOpts = + {resource_opts, + ?HOCON( + ?R_REF(resource_opts), + #{ + required => true, + desc => ?DESC(emqx_resource_schema, resource_opts) + } + )}, + RedisType = + {redis_type, + ?HOCON( + ?ENUM([single, sentinel, cluster]), + #{required => true, desc => ?DESC(redis_type)} + )}, + [RedisType | lists:keyreplace(resource_opts, 1, Schema, ResOpts)]; +fields(resource_opts) -> + emqx_resource_schema:create_opts([ + {batch_size, #{desc => ?DESC(batch_size)}}, + {batch_time, #{desc => ?DESC(batch_time)}} + ]); +%%========================================= +%% HTTP API fields +%%========================================= +fields("post_connector") -> + emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("config_connector"); +fields("put_connector") -> + fields("config_connector"); +fields("get_connector") -> + emqx_bridge_schema:status_fields() ++ + fields("post_connector"); +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"); +fields("post_bridge_v2") -> + emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("put_bridge_v2"); +fields("put_bridge_v2") -> + fields(redis_action); +fields("get_single") -> + emqx_bridge_schema:status_fields() ++ fields("put_single"); +fields("put_single") -> + fields("config_connector"); +fields("post_single") -> + emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("put_single"). + +desc("config_connector") -> + ?DESC(emqx_bridge_redis, "desc_config"); +desc(redis_action) -> + ?DESC(redis_action); +desc(resource_opts) -> + ?DESC(emqx_resource_schema, resource_opts); +desc(_Name) -> + undefined. + +resource_opts_converter(undefined, _Opts) -> + undefined; +resource_opts_converter(Conf, _Opts) -> + maps:map( + fun(_Name, SubConf) -> + case SubConf of + #{<<"redis_type">> := <<"cluster">>} -> + ResOpts = maps:get(<<"resource_opts">>, SubConf, #{}), + %% cluster don't support batch + SubConf#{ + <<"resource_opts">> => + ResOpts#{<<"batch_size">> => 1, <<"batch_time">> => <<"0ms">>} + }; + _ -> + SubConf + end + end, + Conf + ). + +%%------------------------------------------------------------------------------------------------- +%% `emqx_bridge_v2_schema' "unofficial" API +%%------------------------------------------------------------------------------------------------- + +bridge_v2_examples(Method) -> + [ + #{ + <<"redis_single_producer">> => #{ + summary => <<"Redis Single Producer Action">>, + value => action_example(single, Method) + } + }, + #{ + <<"redis_sentinel_producer">> => #{ + summary => <<"Redis Sentinel Producer Action">>, + value => action_example(sentinel, Method) + } + }, + #{ + <<"redis_cluster_producer">> => #{ + summary => <<"Redis Cluster Producer Action">>, + value => action_example(cluster, Method) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + <<"redis_single_producer">> => #{ + summary => <<"Redis Single Producer Connector">>, + value => connector_example(single, Method) + } + }, + #{ + <<"redis_cluster_producer">> => #{ + summary => <<"Redis Cluster Producer Connector">>, + value => connector_example(cluster, Method) + } + }, + #{ + <<"redis_sentinel_producer">> => #{ + summary => <<"Redis Sentinel Producer Connector">>, + value => connector_example(sentinel, Method) + } + } + ]. + +conn_bridge_examples(Method) -> + emqx_bridge_redis:conn_bridge_examples(Method). + +action_example(RedisType, post) -> + maps:merge( + action_example(RedisType, put), + #{ + type => <<"redis">>, + name => <<"my_action">> + } + ); +action_example(RedisType, get) -> + maps:merge( + action_example(RedisType, put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +action_example(RedisType, put) -> + #{ + redis_type => RedisType, + enable => true, + connector => <<"my_connector_name">>, + description => <<"My action">>, + parameters => #{ + command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>] + }, + resource_opts => #{batch_size => 1} + }. + +connector_example(RedisType, get) -> + maps:merge( + connector_example(RedisType, put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +connector_example(RedisType, post) -> + maps:merge( + connector_example(RedisType, put), + #{ + type => <<"redis_single_producer">>, + name => <<"my_connector">> + } + ); +connector_example(RedisType, put) -> + #{ + enable => true, + desc => <<"My redis ", (atom_to_binary(RedisType))/binary, " connector">>, + parameters => connector_parameter(RedisType), + pool_size => 8, + database => 1, + username => <<"test">>, + password => <<"******">>, + auto_reconnect => true, + ssl => #{enable => false} + }. + +connector_parameter(single) -> + #{redis_type => single, server => <<"127.0.0.1:6379">>}; +connector_parameter(cluster) -> + #{redis_type => cluster, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>}; +connector_parameter(sentinel) -> + #{ + redis_type => sentinel, + servers => <<"127.0.0.1:6379,127.0.0.2:6379">>, + sentinel => <<"myredismaster">> + }. diff --git a/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl b/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl index c2430c076..125d84d0f 100644 --- a/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl +++ b/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl @@ -56,6 +56,7 @@ ). all() -> [{group, transports}, {group, rest}]. +suite() -> [{timetrap, {minutes, 20}}]. groups() -> ResourceSpecificTCs = [t_create_delete_bridge], @@ -143,15 +144,19 @@ redis_checks() -> end. end_per_suite(_Config) -> - ok = delete_all_bridges(), + ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(), ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_connector_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_resource]), _ = application:stop(emqx_connector), ok. -init_per_testcase(_Testcase, Config) -> +init_per_testcase(Testcase, Config0) -> + emqx_logger:set_log_level(debug), ok = delete_all_rules(), - ok = delete_all_bridges(), + ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = <<(atom_to_binary(Testcase))/binary, UniqueNum/binary>>, + Config = [{bridge_name, Name} | Config0], case {?config(connector_type, Config), ?config(batch_mode, Config)} of {undefined, _} -> Config; @@ -165,7 +170,13 @@ init_per_testcase(_Testcase, Config) -> IsBatch = (BatchMode =:= batch_on), BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS), BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig}, - [{bridge_config, BridgeConfig1}, {is_batch, IsBatch} | Config] + BridgeType = list_to_atom(atom_to_list(RedisType) ++ "_producer"), + [ + {bridge_type, BridgeType}, + {bridge_config, BridgeConfig1}, + {is_batch, IsBatch} + | Config + ] end. end_per_testcase(_Testcase, Config) -> @@ -173,10 +184,18 @@ end_per_testcase(_Testcase, Config) -> ProxyPort = ?config(proxy_port, Config), ok = snabbkaffe:stop(), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - ok = delete_all_bridges(). + ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(). t_create_delete_bridge(Config) -> - Name = <<"mybridge">>, + Pid = erlang:whereis(eredis_sentinel), + ct:pal("t_create_detele_bridge:~p~n", [ + #{ + config => Config, + sentinel => Pid, + eredis_sentinel => Pid =/= undefined andalso erlang:process_info(Pid) + } + ]), + Name = ?config(bridge_name, Config), Type = ?config(connector_type, Config), BridgeConfig = ?config(bridge_config, Config), IsBatch = ?config(is_batch, Config), @@ -184,13 +203,11 @@ t_create_delete_bridge(Config) -> {ok, _}, emqx_bridge:create(Type, Name, BridgeConfig) ), - ResourceId = emqx_bridge_resource:resource_id(Type, Name), - ?WAIT( {ok, connected}, emqx_resource:health_check(ResourceId), - 5 + 10 ), RedisType = atom_to_binary(Type), @@ -244,7 +261,7 @@ t_check_values(_Config) -> ). t_check_replay(Config) -> - Name = <<"toxic_bridge">>, + Name = ?config(bridge_name, Config), Type = <<"redis_single">>, Topic = <<"local_topic/test">>, ProxyName = "redis_single_tcp", @@ -324,15 +341,15 @@ t_permanent_error(_Config) -> ), ok = emqx_bridge:remove(Type, Name). -t_auth_username_password(_Config) -> - Name = <<"mybridge">>, +t_auth_username_password(Config) -> + Name = ?config(bridge_name, Config), Type = <<"redis_single">>, - ResourceId = emqx_bridge_resource:resource_id(Type, Name), BridgeConfig = username_password_redis_bridge_config(), ?assertMatch( {ok, _}, emqx_bridge:create(Type, Name, BridgeConfig) ), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), ?WAIT( {ok, connected}, emqx_resource:health_check(ResourceId), @@ -340,16 +357,16 @@ t_auth_username_password(_Config) -> ), ok = emqx_bridge:remove(Type, Name). -t_auth_error_username_password(_Config) -> - Name = <<"mybridge">>, +t_auth_error_username_password(Config) -> + Name = ?config(bridge_name, Config), Type = <<"redis_single">>, - ResourceId = emqx_bridge_resource:resource_id(Type, Name), BridgeConfig0 = username_password_redis_bridge_config(), BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}), ?assertMatch( {ok, _}, emqx_bridge:create(Type, Name, BridgeConfig) ), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), ?WAIT( {ok, disconnected}, emqx_resource:health_check(ResourceId), @@ -361,16 +378,16 @@ t_auth_error_username_password(_Config) -> ), ok = emqx_bridge:remove(Type, Name). -t_auth_error_password_only(_Config) -> - Name = <<"mybridge">>, +t_auth_error_password_only(Config) -> + Name = ?config(bridge_name, Config), Type = <<"redis_single">>, - ResourceId = emqx_bridge_resource:resource_id(Type, Name), BridgeConfig0 = toxiproxy_redis_bridge_config(), BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}), ?assertMatch( {ok, _}, emqx_bridge:create(Type, Name, BridgeConfig) ), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), ?assertEqual( {ok, disconnected}, emqx_resource:health_check(ResourceId) @@ -382,7 +399,7 @@ t_auth_error_password_only(_Config) -> ok = emqx_bridge:remove(Type, Name). t_create_disconnected(Config) -> - Name = <<"toxic_bridge">>, + Name = ?config(bridge_name, Config), Type = <<"redis_single">>, ?check_trace( @@ -450,10 +467,8 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) -> added_msgs(ResourceId, BaseTopic, Payload) -> lists:flatmap( fun(K) -> - {ok, Results} = emqx_resource:simple_sync_query( - ResourceId, - {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]} - ), + Message = {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]}, + {ok, Results} = emqx_resource:simple_sync_query(ResourceId, Message), [El || El <- Results, El =:= Payload] end, [format_redis_key(BaseTopic, S) || S <- lists:seq(0, ?KEYSHARDS - 1)] @@ -482,14 +497,6 @@ delete_all_rules() -> emqx_rule_engine:get_rules() ). -delete_all_bridges() -> - lists:foreach( - fun(#{name := Name, type := Type}) -> - emqx_bridge:remove(Type, Name) - end, - emqx_bridge:list() - ). - all_test_hosts() -> Confs = [ ?REDIS_TOXYPROXY_CONNECT_CONFIG @@ -554,12 +561,12 @@ redis_connect_configs() -> tcp => #{ <<"servers">> => <<"redis-sentinel:26379">>, <<"redis_type">> => <<"sentinel">>, - <<"sentinel">> => <<"mymaster">> + <<"sentinel">> => <<"mytcpmaster">> }, tls => #{ <<"servers">> => <<"redis-sentinel-tls:26380">>, <<"redis_type">> => <<"sentinel">>, - <<"sentinel">> => <<"mymaster">>, + <<"sentinel">> => <<"mytlsmaster">>, <<"ssl">> => redis_connect_ssl_opts(redis_sentinel) } }, diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index cc78829e7..99f76f635 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -9,6 +9,9 @@ stdlib, ecpool, emqx_resource, + eredis, + %% eredis_cluster has supervisor should be started before emqx_connector + %% otherwise the first start redis_cluster will fail. eredis_cluster, ehttpc, jose, diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 389623b0a..1d16fd1a1 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -41,6 +41,8 @@ resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server; resource_type(timescale) -> emqx_postgresql; +resource_type(redis) -> + emqx_bridge_redis_connector; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -138,6 +140,14 @@ connector_structs() -> desc => <<"Matrix Connector Config">>, required => false } + )}, + {redis, + mk( + hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")), + #{ + desc => <<"Redis Connector Config">>, + required => false + } )} ]. @@ -164,7 +174,8 @@ schema_modules() -> emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy, emqx_bridge_timescale, - emqx_postgresql_connector_schema + emqx_postgresql_connector_schema, + emqx_bridge_redis_schema ]. api_schemas(Method) -> @@ -188,7 +199,8 @@ api_schemas(Method) -> api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), - api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector") + api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), + api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector") ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index d6f8608ae..9759c8f0a 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -71,16 +71,28 @@ enterprise_fields_connectors() -> []. -endif. -connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]; -connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; -connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer]; -connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; -connector_type_to_bridge_types(matrix) -> [matrix]; -connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; -connector_type_to_bridge_types(pgsql) -> [pgsql]; -connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; -connector_type_to_bridge_types(syskeeper_proxy) -> []; -connector_type_to_bridge_types(timescale) -> [timescale]. +connector_type_to_bridge_types(azure_event_hub_producer) -> + [azure_event_hub_producer]; +connector_type_to_bridge_types(confluent_producer) -> + [confluent_producer]; +connector_type_to_bridge_types(gcp_pubsub_producer) -> + [gcp_pubsub, gcp_pubsub_producer]; +connector_type_to_bridge_types(kafka_producer) -> + [kafka, kafka_producer]; +connector_type_to_bridge_types(matrix) -> + [matrix]; +connector_type_to_bridge_types(mongodb) -> + [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; +connector_type_to_bridge_types(pgsql) -> + [pgsql]; +connector_type_to_bridge_types(syskeeper_forwarder) -> + [syskeeper_forwarder]; +connector_type_to_bridge_types(syskeeper_proxy) -> + []; +connector_type_to_bridge_types(timescale) -> + [timescale]; +connector_type_to_bridge_types(redis) -> + [redis, redis_single, redis_sentinel, redis_cluster]. actions_config_name() -> <<"actions">>. @@ -125,7 +137,7 @@ split_bridge_to_connector_and_action( BridgeType, BridgeV1Conf ); false -> - %% We do an automatic transfomation to get the connector config + %% We do an automatic transformation to get the connector config %% if the callback is not defined. %% Get connector fields from bridge config lists:foldl( diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 6b3d5b4fa..022c6fcb0 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -844,7 +844,7 @@ parse_object_loop(PropList0, Module, Options) -> ), parse_object_loop(PropList, Module, Options, _Props = [], _Required = [], _Refs = []). -parse_object_loop([], _Modlue, _Options, Props, Required, Refs) -> +parse_object_loop([], _Module, _Options, Props, Required, Refs) -> {lists:reverse(Props), lists:usort(Required), Refs}; parse_object_loop([{Name, Hocon} | Rest], Module, Options, Props, Required, Refs) -> NameBin = to_bin(Name), diff --git a/apps/emqx_redis/src/emqx_redis.erl b/apps/emqx_redis/src/emqx_redis.erl index 44137546d..25d64e2fa 100644 --- a/apps/emqx_redis/src/emqx_redis.erl +++ b/apps/emqx_redis/src/emqx_redis.erl @@ -20,7 +20,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). --export([roots/0, fields/1]). +-export([roots/0, fields/1, redis_fields/0, desc/1]). -behaviour(emqx_resource). @@ -50,55 +50,46 @@ roots() -> {config, #{ type => hoconsc:union( [ - hoconsc:ref(?MODULE, cluster), - hoconsc:ref(?MODULE, single), - hoconsc:ref(?MODULE, sentinel) + ?R_REF(redis_cluster), + ?R_REF(redis_single), + ?R_REF(redis_sentinel) ] ) }} ]. -fields(single) -> - [ - {server, server()}, - {redis_type, #{ - type => single, - default => single, - required => false, - desc => ?DESC("single") - }} - ] ++ +fields(redis_single) -> + fields(redis_single_connector) ++ redis_fields() ++ emqx_connector_schema_lib:ssl_fields(); -fields(cluster) -> +fields(redis_single_connector) -> [ - {servers, servers()}, - {redis_type, #{ - type => cluster, - default => cluster, - required => false, - desc => ?DESC("cluster") - }} - ] ++ + {server, server()}, + redis_type(single) + ]; +fields(redis_cluster) -> + fields(redis_cluster_connector) ++ lists:keydelete(database, 1, redis_fields()) ++ emqx_connector_schema_lib:ssl_fields(); -fields(sentinel) -> +fields(redis_cluster_connector) -> [ {servers, servers()}, - {redis_type, #{ - type => sentinel, - default => sentinel, - required => false, - desc => ?DESC("sentinel") - }}, + redis_type(cluster) + ]; +fields(redis_sentinel) -> + fields(redis_sentinel_connector) ++ + redis_fields() ++ + emqx_connector_schema_lib:ssl_fields(); +fields(redis_sentinel_connector) -> + [ + {servers, servers()}, + redis_type(sentinel), {sentinel, #{ type => string(), required => true, desc => ?DESC("sentinel_desc") }} - ] ++ - redis_fields() ++ - emqx_connector_schema_lib:ssl_fields(). + ]. server() -> Meta = #{desc => ?DESC("server")}, @@ -108,64 +99,52 @@ servers() -> Meta = #{desc => ?DESC("servers")}, emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS). +desc(redis_cluster_connector) -> + ?DESC(redis_cluster_connector); +desc(redis_single_connector) -> + ?DESC(redis_single_connector); +desc(redis_sentinel_connector) -> + ?DESC(redis_sentinel_connector); +desc(_) -> + undefined. + %% =================================================================== +redis_type(Type) -> + {redis_type, #{ + type => Type, + default => Type, + required => false, + desc => ?DESC(Type) + }}. + callback_mode() -> always_sync. -on_start( - InstId, - #{ - redis_type := Type, - pool_size := PoolSize, - ssl := SSL - } = Config -) -> +on_start(InstId, Config0) -> ?SLOG(info, #{ msg => "starting_redis_connector", connector => InstId, - config => emqx_utils:redact(Config) + config => emqx_utils:redact(Config0) }), - ConfKey = - case Type of - single -> server; - _ -> servers - end, - Servers0 = maps:get(ConfKey, Config), - Servers1 = lists:map( - fun(#{hostname := Host, port := Port}) -> - {Host, Port} - end, - emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS) - ), - Servers = [{servers, Servers1}], - Database = - case Type of - cluster -> []; - _ -> [{database, maps:get(database, Config)}] - end, + Config = config(Config0), + #{pool_size := PoolSize, ssl := SSL, redis_type := Type} = Config, + Options = ssl_options(SSL) ++ [{sentinel, maps:get(sentinel, Config, undefined)}], Opts = [ - {pool_size, PoolSize}, {username, maps:get(username, Config, undefined)}, {password, maps:get(password, Config, "")}, + {servers, servers(Config)}, + {options, Options}, + {pool_size, PoolSize}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL} - ] ++ Database ++ Servers, - Options = - case maps:get(enable, SSL) of - true -> - [ - {ssl, true}, - {ssl_options, emqx_tls_lib:to_client_opts(SSL)} - ]; - false -> - [{ssl, false}] - end ++ [{sentinel, maps:get(sentinel, Config, undefined)}], + ] ++ database(Config), + State = #{pool_name => InstId, type => Type}, ok = emqx_resource:allocate_resource(InstId, type, Type), ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case Type of cluster -> - case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of + case eredis_cluster:start_pool(InstId, Opts) of {ok, _} -> {ok, State}; {ok, _, _} -> @@ -174,7 +153,7 @@ on_start( {error, Reason} end; _ -> - case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ [{options, Options}]) of + case emqx_resource_pool:start(InstId, ?MODULE, Opts) of ok -> {ok, State}; {error, Reason} -> @@ -182,6 +161,14 @@ on_start( end end. +ssl_options(SSL = #{enable := true}) -> + [ + {ssl, true}, + {ssl_options, emqx_tls_lib:to_client_opts(SSL)} + ]; +ssl_options(#{enable := false}) -> + [{ssl, false}]. + on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_redis_connector", @@ -189,7 +176,11 @@ on_stop(InstId, _State) -> }), case emqx_resource:get_allocated_resources(InstId) of #{pool_name := PoolName, type := cluster} -> - eredis_cluster:stop_pool(PoolName); + case eredis_cluster:stop_pool(PoolName) of + {error, not_found} -> ok; + ok -> ok; + Error -> Error + end; #{pool_name := PoolName, type := _} -> emqx_resource_pool:stop(PoolName); _ -> @@ -244,8 +235,17 @@ is_unrecoverable_error(_) -> on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) -> case eredis_cluster:pool_exists(PoolName) of true -> - Health = eredis_cluster:ping_all(PoolName), - status_result(Health); + %% eredis_cluster has null slot even pool_exists when emqx start before redis cluster. + %% we need restart eredis_cluster pool when pool_worker(slot) is empty. + %% If the pool is empty, it means that there are no workers attempting to reconnect. + %% In this case, we can directly consider it as a disconnect and then proceed to reconnect. + case eredis_cluster_monitor:get_all_pools(PoolName) of + [] -> + disconnected; + [_ | _] -> + Health = eredis_cluster:ping_all(PoolName), + status_result(Health) + end; false -> disconnected end; @@ -289,6 +289,28 @@ wrap_qp_result(Results) when is_list(Results) -> end. %% =================================================================== +%% parameters for connector +config(#{parameters := #{} = Param} = Config) -> + maps:merge(maps:remove(parameters, Config), Param); +%% is for authn/authz +config(Config) -> + Config. + +servers(#{server := Server}) -> + servers(Server); +servers(#{servers := Servers}) -> + servers(Servers); +servers(Servers) -> + lists:map( + fun(#{hostname := Host, port := Port}) -> + {Host, Port} + end, + emqx_schema:parse_servers(Servers, ?REDIS_HOST_OPTIONS) + ). + +database(#{redis_type := cluster}) -> []; +database(#{database := Database}) -> [{database, Database}]. + connect(Opts) -> eredis:start_link(Opts). diff --git a/apps/emqx_redis/test/emqx_redis_SUITE.erl b/apps/emqx_redis/test/emqx_redis_SUITE.erl index 8fcbf2b63..e7ce7dd4f 100644 --- a/apps/emqx_redis/test/emqx_redis_SUITE.erl +++ b/apps/emqx_redis/test/emqx_redis_SUITE.erl @@ -223,7 +223,7 @@ redis_config_base(Type, ServerKey) -> "sentinel" -> Host = ?REDIS_SENTINEL_HOST, Port = ?REDIS_SENTINEL_PORT, - MaybeSentinel = " sentinel = mymaster\n", + MaybeSentinel = " sentinel = mytcpmaster\n", MaybeDatabase = " database = 1\n"; "single" -> Host = ?REDIS_SINGLE_HOST, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 7feacee77..8b27cdda4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -13,7 +13,11 @@ uuid, emqx, emqx_utils, - emqx_ctl + emqx_ctl, + %% rule_engine should wait for bridge connector start, + %% it's will check action/connector ref's exist. + emqx_bridge, + emqx_connector ]}, {mod, {emqx_rule_engine_app, []}}, {env, []}, diff --git a/rel/i18n/emqx_bridge_redis.hocon b/rel/i18n/emqx_bridge_redis.hocon index 05c8d95a6..03831b02f 100644 --- a/rel/i18n/emqx_bridge_redis.hocon +++ b/rel/i18n/emqx_bridge_redis.hocon @@ -32,14 +32,19 @@ desc_type.desc: desc_type.label: """Bridge Type""" -local_topic.desc: +desc_local_topic.desc: """The MQTT topic filter to be forwarded to Redis. All MQTT 'PUBLISH' messages with the topic matching the local_topic will be forwarded.
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that match local_topic will be forwarded.""" -local_topic.label: +desc_local_topic.label: """Local Topic""" +desc_action_parameters.desc: +"""The parameters of the action.""" +desc_action_parameters.label: +"""Action Parameters""" + } diff --git a/rel/i18n/emqx_bridge_redis_schema.hocon b/rel/i18n/emqx_bridge_redis_schema.hocon new file mode 100644 index 000000000..861c0c185 --- /dev/null +++ b/rel/i18n/emqx_bridge_redis_schema.hocon @@ -0,0 +1,41 @@ +emqx_bridge_redis_schema { +redis_parameters.label: +"""Redis Type Specific Parameters""" + +redis_parameters.desc: +"""Set of parameters specific for the given type of this Redis connector, `redis_type` can be one of `single`, `cluster` or `sentinel`.""" + +producer_action.desc: +"""The parameters of the action.""" +producer_action.label: +"""Action Parameters""" + +redis_type.label: +"""Redis Type""" +redis_type.desc: +"""Single mode. Must be set to 'single' when Redis server is running in single mode. +Sentinel mode. Must be set to 'sentinel' when Redis server is running in sentinel mode. +Cluster mode. Must be set to 'cluster' when Redis server is running in clustered mode.""" + +batch_size.label: +"""Batch Size""" +batch_size.desc: +"""This parameter defines the upper limit of the batch count. +Setting this value to 1 effectively disables batching, as it indicates that only one item will be processed per batch. +Note on Redis Cluster Mode: +In the context of Redis Cluster Mode, it is important to note that batching is not supported. +Consequently, the batch_size is always set to 1, +reflecting the mode inherent limitation in handling batch operations.""" + +batch_time.desc: +"""Maximum waiting interval when accumulating a batch at a low message rates for more efficient resource usage.""" + +batch_time.label: +"""Max batch wait time, disable when in Redis Cluster Mode.""" + +redis_action.label: +"""Redis Action""" +redis_action.desc: +"""Action to interact with a Redis connector.""" + +} diff --git a/rel/i18n/emqx_connector_schema.hocon b/rel/i18n/emqx_connector_schema.hocon index 16d153e12..24baefd89 100644 --- a/rel/i18n/emqx_connector_schema.hocon +++ b/rel/i18n/emqx_connector_schema.hocon @@ -1,5 +1,11 @@ emqx_connector_schema { +config_enable.desc: +"""Enable (true) or disable (false) this connector.""" + +config_enable.label: +"""Enable or Disable""" + desc_connectors.desc: """Connectors that are used to connect to external systems""" desc_connectors.label: diff --git a/rel/i18n/emqx_redis.hocon b/rel/i18n/emqx_redis.hocon index af20c5980..28f531094 100644 --- a/rel/i18n/emqx_redis.hocon +++ b/rel/i18n/emqx_redis.hocon @@ -47,4 +47,18 @@ single.desc: single.label: """Single Mode""" +redis_cluster_connector.label: +"""Redis Cluster Connector""" +redis_cluster_connector.desc: +"""Redis connector in cluster mode""" + +redis_sentinel_connector.label: +"""Redis Sentinel Connector""" +redis_sentinel_connector.desc: +"""Redis connector in sentinel mode""" + +redis_single_connector.label: +"""Redis Single Connector""" +redis_single_connector.desc: +"""Redis connector in sentinel mode""" } diff --git a/scripts/pre-compile.sh b/scripts/pre-compile.sh index dfad7c869..ce2c532ca 100755 --- a/scripts/pre-compile.sh +++ b/scripts/pre-compile.sh @@ -27,10 +27,20 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.." I18N_REPO_BRANCH="v$(./pkg-vsn.sh "${PROFILE_STR}" | tr -d '.' | cut -c 1-2)" +UPDATE_I18N=${UPDATE_I18N:-true} # download desc (i18n) translations -curl -L --silent --show-error \ - --output "apps/emqx_dashboard/priv/desc.zh.hocon" \ - "https://raw.githubusercontent.com/emqx/emqx-i18n/${I18N_REPO_BRANCH}/desc.zh.hocon" +if [ "$UPDATE_I18N" = "true" ]; then + echo "updating i18n file from emqx-i18n repo" + start=$(date +%s) + curl -L --silent --show-error \ + --output "apps/emqx_dashboard/priv/desc.zh.hocon" \ + "https://raw.githubusercontent.com/emqx/emqx-i18n/${I18N_REPO_BRANCH}/desc.zh.hocon" + end=$(date +%s) + duration=$(echo "$end $start" | awk '{print $1 - $2}') + echo "updated i18n file using $duration seconds, set UPDATE_I18N=false to skip" +else + echo "skipping update i18n file from emqx-i18n repo, set UPDATE_I18N=true to update" +fi # TODO # make sbom a build artifcat