From e3fbf6c958f78a4aa1e3734e23264b746ce6ea3f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 24 Nov 2023 17:43:01 -0300 Subject: [PATCH 1/8] test: attempting to stabilize more flaky tests --- .../emqx_bridge_gcp_pubsub_consumer_SUITE.erl | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index d82a61fee..24ec3ec75 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -208,7 +208,7 @@ consumer_config(TestCase, Config) -> " resource_opts {\n" " health_check_interval = \"1s\"\n" %% to fail and retry pulling faster - " request_ttl = \"5s\"\n" + " request_ttl = \"1s\"\n" " }\n" "}\n", [ @@ -285,7 +285,7 @@ start_control_client() -> connect_timeout => 5_000, max_retries => 0, pool_size => 1, - resource_opts => #{request_ttl => 5_000}, + resource_opts => #{request_ttl => 1_000}, service_account_json => RawServiceAccount }, PoolName = <<"control_connector">>, @@ -1265,11 +1265,12 @@ t_multiple_pull_workers(Config) -> <<"consumer">> => #{ %% reduce flakiness <<"ack_deadline">> => <<"10m">>, + <<"ack_retry_interval">> => <<"1s">>, <<"consumer_workers_per_topic">> => NConsumers }, <<"resource_opts">> => #{ %% reduce flakiness - <<"request_ttl">> => <<"15s">> + <<"request_ttl">> => <<"4s">> } } ), @@ -1888,7 +1889,10 @@ t_connection_down_during_ack(Config) -> {{ok, _}, {ok, _}} = ?wait_async_action( - create_bridge(Config), + create_bridge( + Config, + #{<<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>}} + ), #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, 10_000 ), @@ -2026,7 +2030,10 @@ t_connection_down_during_pull(Config) -> {{ok, _}, {ok, _}} = ?wait_async_action( - create_bridge(Config), + create_bridge( + Config, + #{<<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>}} + ), #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, 10_000 ), From bd40b5c553c87b6656bf61370854f1d6a567295f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Nov 2023 14:45:18 -0300 Subject: [PATCH 2/8] test(flaky): more adjustments --- ...emqx_bridge_gcp_pubsub_consumer_worker.erl | 1 - .../emqx_bridge_gcp_pubsub_consumer_SUITE.erl | 29 +++++++++++++++---- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index 84a4e6d13..6b64a02e9 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -478,7 +478,6 @@ do_pull_async(State0) -> Body = body(State0, pull), PreparedRequest = {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]}, - %% `ehttpc_pool'/`gproc_pool' might return `false' if there are no workers... Res = emqx_bridge_gcp_pubsub_client:query_async( PreparedRequest, ReplyFunAndArgs, diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index 24ec3ec75..7e90ab48a 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -512,10 +512,16 @@ wait_acked(Opts) -> %% no need to check return value; we check the property in %% the check phase. this is just to give it a chance to do %% so and avoid flakiness. should be fast. - snabbkaffe:block_until( + Res = snabbkaffe:block_until( ?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}), Timeout ), + case Res of + {ok, _} -> + ok; + {timeout, Evts} -> + ct:pal("timed out waiting for acks; received:\n ~p", [Evts]) + end, ok. wait_forgotten() -> @@ -1270,7 +1276,7 @@ t_multiple_pull_workers(Config) -> }, <<"resource_opts">> => #{ %% reduce flakiness - <<"request_ttl">> => <<"4s">> + <<"request_ttl">> => <<"11s">> } } ), @@ -1532,11 +1538,12 @@ t_async_worker_death_mid_pull(Config) -> ct:pal("published message"), AsyncWorkerPids = get_async_worker_pids(Config), + Timeout = 20_000, emqx_utils:pmap( fun(AsyncWorkerPid) -> Ref = monitor(process, AsyncWorkerPid), ct:pal("killing pid ~p", [AsyncWorkerPid]), - sys:terminate(AsyncWorkerPid, die, 20_000), + sys:terminate(AsyncWorkerPid, die, Timeout), receive {'DOWN', Ref, process, AsyncWorkerPid, _} -> ct:pal("killed pid ~p", [AsyncWorkerPid]), @@ -1545,7 +1552,8 @@ t_async_worker_death_mid_pull(Config) -> end, ok end, - AsyncWorkerPids + AsyncWorkerPids, + Timeout + 2_000 ), ok @@ -1559,7 +1567,13 @@ t_async_worker_death_mid_pull(Config) -> ?wait_async_action( create_bridge( Config, - #{<<"pool_size">> => 1} + #{ + <<"pool_size">> => 1, + <<"consumer">> => #{ + <<"ack_deadline">> => <<"10s">>, + <<"ack_retry_interval">> => <<"1s">> + } + } ), #{?snk_kind := gcp_pubsub_consumer_worker_init}, 10_000 @@ -2032,7 +2046,10 @@ t_connection_down_during_pull(Config) -> ?wait_async_action( create_bridge( Config, - #{<<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>}} + #{ + <<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>}, + <<"resource_opts">> => #{<<"request_ttl">> => <<"11s">>} + } ), #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, 10_000 From 64f6220cf5e1ddf38b4a1986bf80eb93de7ce573 Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 28 Nov 2023 12:09:53 +0100 Subject: [PATCH 3/8] feat(quic): bump to quicer-0.0.303 and emqtt 1.9.7 --- apps/emqx/rebar.config | 4 ++-- apps/emqx/rebar.config.script | 2 +- apps/emqx/src/emqx_quic_stream.erl | 2 +- apps/emqx/test/emqx_quic_multistreams_SUITE.erl | 15 +++++++-------- changes/ce/feat-12040.en.md | 2 ++ mix.exs | 4 ++-- rebar.config | 2 +- rebar.config.erl | 2 +- 8 files changed, 17 insertions(+), 16 deletions(-) create mode 100644 changes/ce/feat-12040.en.md diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 71f581267..5400cda2f 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -45,7 +45,7 @@ {meck, "0.9.2"}, {proper, "1.4.0"}, {bbmustache, "1.10.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.1"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.7"}}} ]}, {extra_src_dirs, [{"test", [recursive]}, {"integration_test", [recursive]}]} @@ -55,7 +55,7 @@ {meck, "0.9.2"}, {proper, "1.4.0"}, {bbmustache, "1.10.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.1"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.7"}}} ]}, {extra_src_dirs, [{"test", [recursive]}]} ]} diff --git a/apps/emqx/rebar.config.script b/apps/emqx/rebar.config.script index 174663e80..d68998c4c 100644 --- a/apps/emqx/rebar.config.script +++ b/apps/emqx/rebar.config.script @@ -24,7 +24,7 @@ IsQuicSupp = fun() -> end, Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}, -Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.202"}}}. +Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.303"}}}. Dialyzer = fun(Config) -> {dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config), diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 05413b0cf..63af45b1b 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -184,7 +184,7 @@ peer_send_aborted(Stream, ErrorCode, S) -> -spec peer_send_shutdown(stream_handle(), undefined, cb_data()) -> cb_ret(). peer_send_shutdown(Stream, undefined, S) -> - ok = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0), + _ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0), {ok, S}. -spec send_complete(stream_handle(), boolean(), cb_data()) -> cb_ret(). diff --git a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl index c5eaf4c24..b2205a659 100644 --- a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl +++ b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl @@ -669,22 +669,21 @@ t_multi_streams_packet_malform(Config) -> case quicer:send(MalformStream, <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>) of {ok, 10} -> ok; {error, cancelled} -> ok; - {error, stm_send_error, aborted} -> ok + {error, stm_send_error, aborted} -> ok; + {error, closed} -> ok end, ?assert(is_list(emqtt:info(C))), - - {error, stm_send_error, _} = + {error, closed} = snabbkaffe:retry( 10000, 10, fun() -> - {error, stm_send_error, _} = quicer:send( + {error, closed} = quicer:send( MalformStream, <<1, 2, 3, 4, 5, 6, 7, 8, 9, 0>> ) end ), - ?assert(is_list(emqtt:info(C))), ok = emqtt:disconnect(C). @@ -770,9 +769,9 @@ t_multi_streams_packet_too_large(Config) -> timeout = recv_pub(1), ?assert(is_list(emqtt:info(C))), - %% Connection could be kept - {error, stm_send_error, _} = quicer:send(via_stream(PubVia), <<1>>), - {error, stm_send_error, _} = quicer:send(via_stream(PubVia2), <<1>>), + %% Connection could be kept but data stream are closed! + {error, closed} = quicer:send(via_stream(PubVia), <<1>>), + {error, closed} = quicer:send(via_stream(PubVia2), <<1>>), %% We could send data over new stream {ok, PubVia3} = emqtt:start_data_stream(C, []), ok = emqtt:publish_async( diff --git a/changes/ce/feat-12040.en.md b/changes/ce/feat-12040.en.md new file mode 100644 index 000000000..de092139a --- /dev/null +++ b/changes/ce/feat-12040.en.md @@ -0,0 +1,2 @@ +Upgrade QUIC stack, more features on the way! + diff --git a/mix.exs b/mix.exs index 2ca4bc3e2..123273266 100644 --- a/mix.exs +++ b/mix.exs @@ -64,7 +64,7 @@ defmodule EMQXUmbrella.MixProject do {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, # maybe forbid to fetch quicer {:emqtt, - github: "emqx/emqtt", tag: "1.9.1", override: true, system_env: maybe_no_quic_env()}, + github: "emqx/emqtt", tag: "1.9.7", override: true, system_env: maybe_no_quic_env()}, {:rulesql, github: "emqx/rulesql", tag: "0.1.7"}, {:observer_cli, "1.7.1"}, {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"}, @@ -830,7 +830,7 @@ defmodule EMQXUmbrella.MixProject do defp quicer_dep() do if enable_quicer?(), # in conflict with emqx and emqtt - do: [{:quicer, github: "emqx/quic", tag: "0.0.202", override: true}], + do: [{:quicer, github: "emqx/quic", tag: "0.0.303", override: true}], else: [] end diff --git a/rebar.config b/rebar.config index 3a887e941..efdba5eaa 100644 --- a/rebar.config +++ b/rebar.config @@ -69,7 +69,7 @@ , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.1"}}} + , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.7"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.7"}}} , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} diff --git a/rebar.config.erl b/rebar.config.erl index 98e29f32a..c3ace671c 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -39,7 +39,7 @@ bcrypt() -> {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.1"}}}. quicer() -> - {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.202"}}}. + {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.303"}}}. jq() -> {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.11"}}}. From e5508105c72b81076d54b71fc83b3f4f3725a607 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Tue, 28 Nov 2023 18:24:06 +0300 Subject: [PATCH 4/8] fix(redis): start and load eredis app --- apps/emqx_redis/src/emqx_redis.app.src | 1 + changes/ce/fix-12044.en.md | 1 + 2 files changed, 2 insertions(+) create mode 100644 changes/ce/fix-12044.en.md diff --git a/apps/emqx_redis/src/emqx_redis.app.src b/apps/emqx_redis/src/emqx_redis.app.src index c9513bcf9..e51c0fa80 100644 --- a/apps/emqx_redis/src/emqx_redis.app.src +++ b/apps/emqx_redis/src/emqx_redis.app.src @@ -5,6 +5,7 @@ {applications, [ kernel, stdlib, + eredis, eredis_cluster, emqx_connector, emqx_resource diff --git a/changes/ce/fix-12044.en.md b/changes/ce/fix-12044.en.md new file mode 100644 index 000000000..89f114215 --- /dev/null +++ b/changes/ce/fix-12044.en.md @@ -0,0 +1 @@ +Fix Redis authorization, authentication, and bridges. Previously connections to Redis servers could not be established because driver was not properly loaded. From 0aec2f760574d9f34757fc4bfaf6e24e2713b104 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 23 Nov 2023 11:00:07 +0800 Subject: [PATCH 5/8] feat: redis bridge v2 --- .../redis/sentinel-tcp/sentinel-base.conf | 8 +- .../redis/sentinel-tls/sentinel-base.conf | 8 +- .../src/emqx_authn_redis_schema.erl | 8 +- .../src/emqx_authz_redis_schema.erl | 12 +- apps/emqx_bridge/src/emqx_action_info.erl | 3 +- .../src/schema/emqx_bridge_v2_schema.erl | 6 +- .../src/emqx_bridge_redis.app.src | 6 +- .../src/emqx_bridge_redis.erl | 26 +- .../src/emqx_bridge_redis_action_info.erl | 98 +++++++ .../src/emqx_bridge_redis_connector.erl | 139 +++++---- .../src/emqx_bridge_redis_schema.erl | 276 ++++++++++++++++++ .../test/emqx_bridge_redis_SUITE.erl | 75 ++--- .../emqx_connector/src/emqx_connector.app.src | 3 + .../src/schema/emqx_connector_ee_schema.erl | 16 +- .../src/schema/emqx_connector_schema.erl | 34 ++- .../src/emqx_dashboard_swagger.erl | 2 +- apps/emqx_redis/src/emqx_redis.erl | 176 ++++++----- apps/emqx_redis/test/emqx_redis_SUITE.erl | 2 +- .../src/emqx_rule_engine.app.src | 6 +- rel/i18n/emqx_bridge_redis.hocon | 9 +- rel/i18n/emqx_bridge_redis_schema.hocon | 41 +++ rel/i18n/emqx_connector_schema.hocon | 6 + rel/i18n/emqx_redis.hocon | 14 + scripts/pre-compile.sh | 16 +- 24 files changed, 764 insertions(+), 226 deletions(-) create mode 100644 apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl create mode 100644 apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl create mode 100644 rel/i18n/emqx_bridge_redis_schema.hocon diff --git a/.ci/docker-compose-file/redis/sentinel-tcp/sentinel-base.conf b/.ci/docker-compose-file/redis/sentinel-tcp/sentinel-base.conf index 419f2a935..c43de536b 100644 --- a/.ci/docker-compose-file/redis/sentinel-tcp/sentinel-base.conf +++ b/.ci/docker-compose-file/redis/sentinel-tcp/sentinel-base.conf @@ -1,7 +1,7 @@ sentinel resolve-hostnames yes bind :: 0.0.0.0 -sentinel monitor mymaster redis-sentinel-master 6379 1 -sentinel auth-pass mymaster public -sentinel down-after-milliseconds mymaster 10000 -sentinel failover-timeout mymaster 20000 +sentinel monitor mytcpmaster redis-sentinel-master 6379 1 +sentinel auth-pass mytcpmaster public +sentinel down-after-milliseconds mytcpmaster 10000 +sentinel failover-timeout mytcpmaster 20000 diff --git a/.ci/docker-compose-file/redis/sentinel-tls/sentinel-base.conf b/.ci/docker-compose-file/redis/sentinel-tls/sentinel-base.conf index 8363ae383..7ea32f805 100644 --- a/.ci/docker-compose-file/redis/sentinel-tls/sentinel-base.conf +++ b/.ci/docker-compose-file/redis/sentinel-tls/sentinel-base.conf @@ -8,7 +8,7 @@ tls-key-file /etc/certs/key.pem tls-ca-cert-file /etc/certs/cacert.pem tls-auth-clients no -sentinel monitor mymaster redis-sentinel-tls-master 6389 1 -sentinel auth-pass mymaster public -sentinel down-after-milliseconds mymaster 10000 -sentinel failover-timeout mymaster 20000 +sentinel monitor mytlsmaster redis-sentinel-tls-master 6389 1 +sentinel auth-pass mytlsmaster public +sentinel down-after-milliseconds mytlsmaster 10000 +sentinel failover-timeout mytlsmaster 20000 diff --git a/apps/emqx_auth_redis/src/emqx_authn_redis_schema.erl b/apps/emqx_auth_redis/src/emqx_authn_redis_schema.erl index f3e124ca1..b72905f6b 100644 --- a/apps/emqx_auth_redis/src/emqx_authn_redis_schema.erl +++ b/apps/emqx_auth_redis/src/emqx_authn_redis_schema.erl @@ -64,12 +64,8 @@ refs(_) -> expected => "single | cluster | sentinel" }). -fields(redis_single) -> - common_fields() ++ emqx_redis:fields(single); -fields(redis_cluster) -> - common_fields() ++ emqx_redis:fields(cluster); -fields(redis_sentinel) -> - common_fields() ++ emqx_redis:fields(sentinel). +fields(Type) -> + common_fields() ++ emqx_redis:fields(Type). desc(redis_single) -> ?DESC(single); diff --git a/apps/emqx_auth_redis/src/emqx_authz_redis_schema.erl b/apps/emqx_auth_redis/src/emqx_authz_redis_schema.erl index 5cd084795..96949b0ea 100644 --- a/apps/emqx_auth_redis/src/emqx_authz_redis_schema.erl +++ b/apps/emqx_auth_redis/src/emqx_authz_redis_schema.erl @@ -34,17 +34,9 @@ namespace() -> "authz". type() -> ?AUTHZ_TYPE. -fields(redis_single) -> +fields(Type) -> emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++ - emqx_redis:fields(single) ++ - [{cmd, cmd()}]; -fields(redis_sentinel) -> - emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++ - emqx_redis:fields(sentinel) ++ - [{cmd, cmd()}]; -fields(redis_cluster) -> - emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++ - emqx_redis:fields(cluster) ++ + emqx_redis:fields(Type) ++ [{cmd, cmd()}]. desc(redis_single) -> diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 4f195b417..ddb8424ae 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -81,7 +81,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_mongodb_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, - emqx_bridge_timescale_action_info + emqx_bridge_timescale_action_info, + emqx_bridge_redis_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 188a550fc..2c2dde4da 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -242,17 +242,17 @@ schema_homogeneous_test() -> is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> Fields = Module:fields(TypeName), ExpectedFieldNames = lists:map(fun binary_to_atom/1, top_level_common_action_keys()), - MissingFileds = lists:filter( + MissingFields = lists:filter( fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames ), - case MissingFileds of + case MissingFields of [] -> false; _ -> {true, #{ schema_module => Module, type_name => TypeName, - missing_fields => MissingFileds + missing_fields => MissingFields }} end. diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src index 5b6163969..53130d188 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_redis, [ {description, "EMQX Enterprise Redis Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, @@ -9,7 +9,9 @@ emqx_resource, emqx_redis ]}, - {env, []}, + {env, [ + {emqx_action_info_modules, [emqx_bridge_redis_action_info]} + ]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl index 1c8ee75f9..beafc8775 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl @@ -8,9 +8,9 @@ -import(hoconsc, [mk/2, enum/1, ref/1, ref/2]). --export([ - conn_bridge_examples/1 -]). +-export([conn_bridge_examples/1]). + +-export([type_name_fields/1, connector_fields/1]). -export([ namespace/0, @@ -100,6 +100,8 @@ namespace() -> "bridge_redis". roots() -> []. +fields(action_parameters) -> + [{command_template, fun command_template/1}]; fields("post_single") -> method_fields(post, redis_single); fields("post_sentinel") -> @@ -142,21 +144,13 @@ method_fields(put, ConnectorType) -> redis_bridge_common_fields(Type) -> emqx_bridge_schema:common_bridge_fields() ++ [ - {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, - {command_template, fun command_template/1} + {local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})} + | fields(action_parameters) ] ++ resource_fields(Type). connector_fields(Type) -> - RedisType = bridge_type_to_redis_conn_type(Type), - emqx_redis:fields(RedisType). - -bridge_type_to_redis_conn_type(redis_single) -> - single; -bridge_type_to_redis_conn_type(redis_sentinel) -> - sentinel; -bridge_type_to_redis_conn_type(redis_cluster) -> - cluster. + emqx_redis:fields(Type). type_name_fields(Type) -> [ @@ -168,7 +162,7 @@ resource_fields(Type) -> [ {resource_opts, mk( - ref("creation_opts_" ++ atom_to_list(Type)), + ?R_REF("creation_opts_" ++ atom_to_list(Type)), #{ required => false, default => #{}, @@ -185,6 +179,8 @@ resource_creation_fields("redis_cluster") -> resource_creation_fields(_) -> emqx_resource_schema:fields("creation_opts"). +desc(action_parameters) -> + ?DESC("desc_action_parameters"); desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl new file mode 100644 index 000000000..22ed40093 --- /dev/null +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl @@ -0,0 +1,98 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_redis_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0, + bridge_v1_config_to_action_config/2, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_config_to_connector_config/1, + bridge_v1_type_name_fun/1 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(SCHEMA_MODULE, emqx_bridge_redis_schema). +-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]). + +action_type_name() -> redis. + +connector_type_name() -> redis. + +schema_module() -> ?SCHEMA_MODULE. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + fix_v1_type( + maps:merge( + maps:without( + [<<"connector">>], + map_unindent(<<"parameters">>, ActionConfig) + ), + map_unindent(<<"parameters">>, ConnectorConfig) + ) + ). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)), + ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + ActionConfig#{<<"connector">> => ConnectorName}. + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)), + ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ConnectorTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields("config_connector")), + %% Need put redis_type into parameter. + %% cluster need type to filter resource_opts + ConnectorKeys = + (maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys)) ++ + [<<"redis_type">>], + ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys, + make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config). + +%%------------------------------------------------------------------------------------------ +%% Internal helper fns +%%------------------------------------------------------------------------------------------ + +bridge_v1_type_name() -> + {fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}. +bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"redis_type">> := Type}}, _}) -> + v1_type(Type). + +fix_v1_type(#{<<"redis_type">> := RedisType} = Conf) -> + Conf#{<<"type">> => v1_type(RedisType)}. + +v1_type(<<"single">>) -> redis_single; +v1_type(<<"sentinel">>) -> redis_sentinel; +v1_type(<<"cluster">>) -> redis_cluster. + +bridge_v1_type_names() -> [redis_single, redis_sentinel, redis_cluster]. + +map_unindent(Key, Map) -> + maps:merge( + maps:get(Key, Map), + maps:remove(Key, Map) + ). + +map_indent(IndentKey, PickKeys, Map) -> + maps:put( + IndentKey, + maps:with(PickKeys, Map), + maps:without(PickKeys, Map) + ). + +schema_keys(Schema) -> + [bin(Key) || {Key, _} <- Schema]. + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + map_indent(<<"parameters">>, IndentKeys, Conf0). diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index 696947726..4835e8127 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -4,6 +4,7 @@ -module(emqx_bridge_redis_connector). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -behaviour(emqx_resource). @@ -11,11 +12,15 @@ %% callbacks of behaviour emqx_resource -export([ callback_mode/0, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, on_start/2, on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_get_channel_status/3 ]). %% ------------------------------------------------------------------------------------------------- @@ -24,7 +29,34 @@ callback_mode() -> always_sync. -on_start(InstId, #{command_template := CommandTemplate} = Config) -> +on_add_channel( + _InstanceId, + State = #{channels := Channels}, + ChannelId, + #{ + parameters := #{ + command_template := Template + } + } +) -> + Channels2 = Channels#{ + ChannelId => #{template => preproc_command_template(Template)} + }, + {ok, State#{channels => Channels2}}. + +on_remove_channel(_InstanceId, State = #{channels := Channels}, ChannelId) -> + {ok, State#{channels => maps:remove(ChannelId, Channels)}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_get_channel_status(_ConnectorResId, ChannelId, #{channels := Channels}) -> + case maps:is_key(ChannelId, Channels) of + true -> ?status_connected; + false -> ?status_disconnected + end. + +on_start(InstId, Config) -> case emqx_redis:on_start(InstId, Config) of {ok, RedisConnSt} -> ?tp( @@ -33,7 +65,7 @@ on_start(InstId, #{command_template := CommandTemplate} = Config) -> ), {ok, #{ conn_st => RedisConnSt, - command_template => preproc_command_template(CommandTemplate) + channels => #{} }}; {error, {start_pool_failed, _, #{type := authentication_error, reason := Reason}}} = Error -> ?tp( @@ -57,14 +89,8 @@ on_stop(InstId, undefined = _State) -> on_get_status(InstId, #{conn_st := RedisConnSt}) -> emqx_redis:on_get_status(InstId, RedisConnSt). -on_query( - InstId, - {send_message, Data}, - _State = #{ - command_template := CommandTemplate, conn_st := RedisConnSt - } -) -> - Cmd = proc_command_template(CommandTemplate, Data), +%% raw cmd without template, for CI test +on_query(InstId, {cmd, Cmd}, #{conn_st := RedisConnSt}) -> ?tp( redis_bridge_connector_cmd, #{cmd => Cmd, batch => false, mode => sync} @@ -77,45 +103,68 @@ on_query( Result; on_query( InstId, - Query, - _State = #{conn_st := RedisConnSt} + {_MessageTag, _Data} = Msg, + #{channels := Channels, conn_st := RedisConnSt} ) -> - ?tp( - redis_bridge_connector_query, - #{query => Query, batch => false, mode => sync} - ), - Result = query(InstId, Query, RedisConnSt), - ?tp( - redis_bridge_connector_send_done, - #{query => Query, batch => false, mode => sync, result => Result} - ), - Result. + case try_render_message([Msg], Channels) of + {ok, [Cmd]} -> + ?tp( + redis_bridge_connector_cmd, + #{cmd => Cmd, batch => false, mode => sync} + ), + Result = query(InstId, {cmd, Cmd}, RedisConnSt), + ?tp( + redis_bridge_connector_send_done, + #{cmd => Cmd, batch => false, mode => sync, result => Result} + ), + Result; + Error -> + Error + end. on_batch_query( - InstId, BatchData, _State = #{command_template := CommandTemplate, conn_st := RedisConnSt} + InstId, BatchData, _State = #{channels := Channels, conn_st := RedisConnSt} ) -> - Cmds = process_batch_data(BatchData, CommandTemplate), - ?tp( - redis_bridge_connector_send, - #{batch_data => BatchData, batch => true, mode => sync} - ), - Result = query(InstId, {cmds, Cmds}, RedisConnSt), - ?tp( - redis_bridge_connector_send_done, - #{ - batch_data => BatchData, - batch_size => length(BatchData), - batch => true, - mode => sync, - result => Result - } - ), - Result. + case try_render_message(BatchData, Channels) of + {ok, Cmds} -> + ?tp( + redis_bridge_connector_send, + #{batch_data => BatchData, batch => true, mode => sync} + ), + Result = query(InstId, {cmds, Cmds}, RedisConnSt), + ?tp( + redis_bridge_connector_send_done, + #{ + batch_data => BatchData, + batch_size => length(BatchData), + batch => true, + mode => sync, + result => Result + } + ), + Result; + Error -> + Error + end. %% ------------------------------------------------------------------------------------------------- %% private helpers %% ------------------------------------------------------------------------------------------------- +try_render_message(Datas, Channels) -> + try_render_message(Datas, Channels, []). + +try_render_message([{MessageTag, Data} | T], Channels, Acc) -> + case maps:find(MessageTag, Channels) of + {ok, #{template := Template}} -> + Msg = proc_command_template(Template, Data), + try_render_message(T, Channels, [Msg | Acc]); + _ -> + {error, {unrecoverable_error, {invalid_message_tag, MessageTag}}} + end; +try_render_message([], _Channels, Acc) -> + {ok, lists:reverse(Acc)}. + query(InstId, Query, RedisConnSt) -> case emqx_redis:on_query(InstId, Query, RedisConnSt) of {ok, _} = Ok -> Ok; @@ -123,14 +172,6 @@ query(InstId, Query, RedisConnSt) -> {error, _} = Error -> Error end. -process_batch_data(BatchData, CommandTemplate) -> - lists:map( - fun({send_message, Data}) -> - proc_command_template(CommandTemplate, Data) - end, - BatchData - ). - proc_command_template(CommandTemplate, Msg) -> lists:map( fun(ArgTks) -> diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl new file mode 100644 index 000000000..f02bf3322 --- /dev/null +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl @@ -0,0 +1,276 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_redis_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-define(TYPE, redis). + +%% `hocon_schema' API +-export([ + namespace/0, + roots/0, + fields/1, + desc/1, + resource_opts_converter/2 +]). + +%% `emqx_bridge_v2_schema' "unofficial" API +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 +]). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + ?TYPE. + +roots() -> + []. + +%%========================================= +%% Action fields +%%========================================= +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + [ + {parameters, + ?HOCON( + hoconsc:union([ + ?R_REF(emqx_redis, redis_single_connector), + ?R_REF(emqx_redis, redis_sentinel_connector), + ?R_REF(emqx_redis, redis_cluster_connector) + ]), + #{required => true, desc => ?DESC(redis_parameters)} + )} + ] ++ + emqx_redis:redis_fields() ++ + emqx_connector_schema_lib:ssl_fields(); +fields(action) -> + {?TYPE, + ?HOCON( + ?MAP(name, ?R_REF(redis_action)), + #{ + desc => <<"Redis Action Config">>, + converter => fun ?MODULE:resource_opts_converter/2, + required => false + } + )}; +fields(redis_action) -> + Schema = + emqx_bridge_v2_schema:make_producer_action_schema( + ?HOCON( + ?R_REF(emqx_bridge_redis, action_parameters), + #{ + required => true, + desc => ?DESC(producer_action) + } + ) + ), + ResOpts = + {resource_opts, + ?HOCON( + ?R_REF(resource_opts), + #{ + required => true, + desc => ?DESC(emqx_resource_schema, resource_opts) + } + )}, + RedisType = + {redis_type, + ?HOCON( + ?ENUM([single, sentinel, cluster]), + #{required => true, desc => ?DESC(redis_type)} + )}, + [RedisType | lists:keyreplace(resource_opts, 1, Schema, ResOpts)]; +fields(resource_opts) -> + emqx_resource_schema:create_opts([ + {batch_size, #{desc => ?DESC(batch_size)}}, + {batch_time, #{desc => ?DESC(batch_time)}} + ]); +%%========================================= +%% HTTP API fields +%%========================================= +fields("post_connector") -> + emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("config_connector"); +fields("put_connector") -> + fields("config_connector"); +fields("get_connector") -> + emqx_bridge_schema:status_fields() ++ + fields("post_connector"); +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"); +fields("post_bridge_v2") -> + emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("put_bridge_v2"); +fields("put_bridge_v2") -> + fields(redis_action); +fields("get_single") -> + emqx_bridge_schema:status_fields() ++ fields("put_single"); +fields("put_single") -> + fields("config_connector"); +fields("post_single") -> + emqx_bridge_redis:type_name_fields(?TYPE) ++ fields("put_single"). + +desc("config_connector") -> + ?DESC(emqx_bridge_redis, "desc_config"); +desc(redis_action) -> + ?DESC(redis_action); +desc(resource_opts) -> + ?DESC(emqx_resource_schema, resource_opts); +desc(_Name) -> + undefined. + +resource_opts_converter(undefined, _Opts) -> + undefined; +resource_opts_converter(Conf, _Opts) -> + maps:map( + fun(_Name, SubConf) -> + case SubConf of + #{<<"redis_type">> := <<"cluster">>} -> + ResOpts = maps:get(<<"resource_opts">>, SubConf, #{}), + %% cluster don't support batch + SubConf#{ + <<"resource_opts">> => + ResOpts#{<<"batch_size">> => 1, <<"batch_time">> => <<"0ms">>} + }; + _ -> + SubConf + end + end, + Conf + ). + +%%------------------------------------------------------------------------------------------------- +%% `emqx_bridge_v2_schema' "unofficial" API +%%------------------------------------------------------------------------------------------------- + +bridge_v2_examples(Method) -> + [ + #{ + <<"redis_single_producer">> => #{ + summary => <<"Redis Single Producer Action">>, + value => action_example(single, Method) + } + }, + #{ + <<"redis_sentinel_producer">> => #{ + summary => <<"Redis Sentinel Producer Action">>, + value => action_example(sentinel, Method) + } + }, + #{ + <<"redis_cluster_producer">> => #{ + summary => <<"Redis Cluster Producer Action">>, + value => action_example(cluster, Method) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + <<"redis_single_producer">> => #{ + summary => <<"Redis Single Producer Connector">>, + value => connector_example(single, Method) + } + }, + #{ + <<"redis_cluster_producer">> => #{ + summary => <<"Redis Cluster Producer Connector">>, + value => connector_example(cluster, Method) + } + }, + #{ + <<"redis_sentinel_producer">> => #{ + summary => <<"Redis Sentinel Producer Connector">>, + value => connector_example(sentinel, Method) + } + } + ]. + +conn_bridge_examples(Method) -> + emqx_bridge_redis:conn_bridge_examples(Method). + +action_example(RedisType, post) -> + maps:merge( + action_example(RedisType, put), + #{ + type => <<"redis">>, + name => <<"my_action">> + } + ); +action_example(RedisType, get) -> + maps:merge( + action_example(RedisType, put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +action_example(RedisType, put) -> + #{ + redis_type => RedisType, + enable => true, + connector => <<"my_connector_name">>, + description => <<"My action">>, + parameters => #{ + command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>] + }, + resource_opts => #{batch_size => 1} + }. + +connector_example(RedisType, get) -> + maps:merge( + connector_example(RedisType, put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +connector_example(RedisType, post) -> + maps:merge( + connector_example(RedisType, put), + #{ + type => <<"redis_single_producer">>, + name => <<"my_connector">> + } + ); +connector_example(RedisType, put) -> + #{ + enable => true, + desc => <<"My redis ", (atom_to_binary(RedisType))/binary, " connector">>, + parameters => connector_parameter(RedisType), + pool_size => 8, + database => 1, + username => <<"test">>, + password => <<"******">>, + auto_reconnect => true, + ssl => #{enable => false} + }. + +connector_parameter(single) -> + #{redis_type => single, server => <<"127.0.0.1:6379">>}; +connector_parameter(cluster) -> + #{redis_type => cluster, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>}; +connector_parameter(sentinel) -> + #{ + redis_type => sentinel, + servers => <<"127.0.0.1:6379,127.0.0.2:6379">>, + sentinel => <<"myredismaster">> + }. diff --git a/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl b/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl index c2430c076..125d84d0f 100644 --- a/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl +++ b/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl @@ -56,6 +56,7 @@ ). all() -> [{group, transports}, {group, rest}]. +suite() -> [{timetrap, {minutes, 20}}]. groups() -> ResourceSpecificTCs = [t_create_delete_bridge], @@ -143,15 +144,19 @@ redis_checks() -> end. end_per_suite(_Config) -> - ok = delete_all_bridges(), + ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(), ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_connector_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_resource]), _ = application:stop(emqx_connector), ok. -init_per_testcase(_Testcase, Config) -> +init_per_testcase(Testcase, Config0) -> + emqx_logger:set_log_level(debug), ok = delete_all_rules(), - ok = delete_all_bridges(), + ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = <<(atom_to_binary(Testcase))/binary, UniqueNum/binary>>, + Config = [{bridge_name, Name} | Config0], case {?config(connector_type, Config), ?config(batch_mode, Config)} of {undefined, _} -> Config; @@ -165,7 +170,13 @@ init_per_testcase(_Testcase, Config) -> IsBatch = (BatchMode =:= batch_on), BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS), BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig}, - [{bridge_config, BridgeConfig1}, {is_batch, IsBatch} | Config] + BridgeType = list_to_atom(atom_to_list(RedisType) ++ "_producer"), + [ + {bridge_type, BridgeType}, + {bridge_config, BridgeConfig1}, + {is_batch, IsBatch} + | Config + ] end. end_per_testcase(_Testcase, Config) -> @@ -173,10 +184,18 @@ end_per_testcase(_Testcase, Config) -> ProxyPort = ?config(proxy_port, Config), ok = snabbkaffe:stop(), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - ok = delete_all_bridges(). + ok = emqx_bridge_v2_SUITE:delete_all_bridges_and_connectors(). t_create_delete_bridge(Config) -> - Name = <<"mybridge">>, + Pid = erlang:whereis(eredis_sentinel), + ct:pal("t_create_detele_bridge:~p~n", [ + #{ + config => Config, + sentinel => Pid, + eredis_sentinel => Pid =/= undefined andalso erlang:process_info(Pid) + } + ]), + Name = ?config(bridge_name, Config), Type = ?config(connector_type, Config), BridgeConfig = ?config(bridge_config, Config), IsBatch = ?config(is_batch, Config), @@ -184,13 +203,11 @@ t_create_delete_bridge(Config) -> {ok, _}, emqx_bridge:create(Type, Name, BridgeConfig) ), - ResourceId = emqx_bridge_resource:resource_id(Type, Name), - ?WAIT( {ok, connected}, emqx_resource:health_check(ResourceId), - 5 + 10 ), RedisType = atom_to_binary(Type), @@ -244,7 +261,7 @@ t_check_values(_Config) -> ). t_check_replay(Config) -> - Name = <<"toxic_bridge">>, + Name = ?config(bridge_name, Config), Type = <<"redis_single">>, Topic = <<"local_topic/test">>, ProxyName = "redis_single_tcp", @@ -324,15 +341,15 @@ t_permanent_error(_Config) -> ), ok = emqx_bridge:remove(Type, Name). -t_auth_username_password(_Config) -> - Name = <<"mybridge">>, +t_auth_username_password(Config) -> + Name = ?config(bridge_name, Config), Type = <<"redis_single">>, - ResourceId = emqx_bridge_resource:resource_id(Type, Name), BridgeConfig = username_password_redis_bridge_config(), ?assertMatch( {ok, _}, emqx_bridge:create(Type, Name, BridgeConfig) ), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), ?WAIT( {ok, connected}, emqx_resource:health_check(ResourceId), @@ -340,16 +357,16 @@ t_auth_username_password(_Config) -> ), ok = emqx_bridge:remove(Type, Name). -t_auth_error_username_password(_Config) -> - Name = <<"mybridge">>, +t_auth_error_username_password(Config) -> + Name = ?config(bridge_name, Config), Type = <<"redis_single">>, - ResourceId = emqx_bridge_resource:resource_id(Type, Name), BridgeConfig0 = username_password_redis_bridge_config(), BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}), ?assertMatch( {ok, _}, emqx_bridge:create(Type, Name, BridgeConfig) ), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), ?WAIT( {ok, disconnected}, emqx_resource:health_check(ResourceId), @@ -361,16 +378,16 @@ t_auth_error_username_password(_Config) -> ), ok = emqx_bridge:remove(Type, Name). -t_auth_error_password_only(_Config) -> - Name = <<"mybridge">>, +t_auth_error_password_only(Config) -> + Name = ?config(bridge_name, Config), Type = <<"redis_single">>, - ResourceId = emqx_bridge_resource:resource_id(Type, Name), BridgeConfig0 = toxiproxy_redis_bridge_config(), BridgeConfig = maps:merge(BridgeConfig0, #{<<"password">> => <<"wrong_password">>}), ?assertMatch( {ok, _}, emqx_bridge:create(Type, Name, BridgeConfig) ), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), ?assertEqual( {ok, disconnected}, emqx_resource:health_check(ResourceId) @@ -382,7 +399,7 @@ t_auth_error_password_only(_Config) -> ok = emqx_bridge:remove(Type, Name). t_create_disconnected(Config) -> - Name = <<"toxic_bridge">>, + Name = ?config(bridge_name, Config), Type = <<"redis_single">>, ?check_trace( @@ -450,10 +467,8 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) -> added_msgs(ResourceId, BaseTopic, Payload) -> lists:flatmap( fun(K) -> - {ok, Results} = emqx_resource:simple_sync_query( - ResourceId, - {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]} - ), + Message = {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]}, + {ok, Results} = emqx_resource:simple_sync_query(ResourceId, Message), [El || El <- Results, El =:= Payload] end, [format_redis_key(BaseTopic, S) || S <- lists:seq(0, ?KEYSHARDS - 1)] @@ -482,14 +497,6 @@ delete_all_rules() -> emqx_rule_engine:get_rules() ). -delete_all_bridges() -> - lists:foreach( - fun(#{name := Name, type := Type}) -> - emqx_bridge:remove(Type, Name) - end, - emqx_bridge:list() - ). - all_test_hosts() -> Confs = [ ?REDIS_TOXYPROXY_CONNECT_CONFIG @@ -554,12 +561,12 @@ redis_connect_configs() -> tcp => #{ <<"servers">> => <<"redis-sentinel:26379">>, <<"redis_type">> => <<"sentinel">>, - <<"sentinel">> => <<"mymaster">> + <<"sentinel">> => <<"mytcpmaster">> }, tls => #{ <<"servers">> => <<"redis-sentinel-tls:26380">>, <<"redis_type">> => <<"sentinel">>, - <<"sentinel">> => <<"mymaster">>, + <<"sentinel">> => <<"mytlsmaster">>, <<"ssl">> => redis_connect_ssl_opts(redis_sentinel) } }, diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index cc78829e7..99f76f635 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -9,6 +9,9 @@ stdlib, ecpool, emqx_resource, + eredis, + %% eredis_cluster has supervisor should be started before emqx_connector + %% otherwise the first start redis_cluster will fail. eredis_cluster, ehttpc, jose, diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 389623b0a..1d16fd1a1 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -41,6 +41,8 @@ resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server; resource_type(timescale) -> emqx_postgresql; +resource_type(redis) -> + emqx_bridge_redis_connector; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -138,6 +140,14 @@ connector_structs() -> desc => <<"Matrix Connector Config">>, required => false } + )}, + {redis, + mk( + hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")), + #{ + desc => <<"Redis Connector Config">>, + required => false + } )} ]. @@ -164,7 +174,8 @@ schema_modules() -> emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy, emqx_bridge_timescale, - emqx_postgresql_connector_schema + emqx_postgresql_connector_schema, + emqx_bridge_redis_schema ]. api_schemas(Method) -> @@ -188,7 +199,8 @@ api_schemas(Method) -> api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), - api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector") + api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), + api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector") ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index d6f8608ae..9759c8f0a 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -71,16 +71,28 @@ enterprise_fields_connectors() -> []. -endif. -connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]; -connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; -connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer]; -connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; -connector_type_to_bridge_types(matrix) -> [matrix]; -connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; -connector_type_to_bridge_types(pgsql) -> [pgsql]; -connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; -connector_type_to_bridge_types(syskeeper_proxy) -> []; -connector_type_to_bridge_types(timescale) -> [timescale]. +connector_type_to_bridge_types(azure_event_hub_producer) -> + [azure_event_hub_producer]; +connector_type_to_bridge_types(confluent_producer) -> + [confluent_producer]; +connector_type_to_bridge_types(gcp_pubsub_producer) -> + [gcp_pubsub, gcp_pubsub_producer]; +connector_type_to_bridge_types(kafka_producer) -> + [kafka, kafka_producer]; +connector_type_to_bridge_types(matrix) -> + [matrix]; +connector_type_to_bridge_types(mongodb) -> + [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; +connector_type_to_bridge_types(pgsql) -> + [pgsql]; +connector_type_to_bridge_types(syskeeper_forwarder) -> + [syskeeper_forwarder]; +connector_type_to_bridge_types(syskeeper_proxy) -> + []; +connector_type_to_bridge_types(timescale) -> + [timescale]; +connector_type_to_bridge_types(redis) -> + [redis, redis_single, redis_sentinel, redis_cluster]. actions_config_name() -> <<"actions">>. @@ -125,7 +137,7 @@ split_bridge_to_connector_and_action( BridgeType, BridgeV1Conf ); false -> - %% We do an automatic transfomation to get the connector config + %% We do an automatic transformation to get the connector config %% if the callback is not defined. %% Get connector fields from bridge config lists:foldl( diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 6b3d5b4fa..022c6fcb0 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -844,7 +844,7 @@ parse_object_loop(PropList0, Module, Options) -> ), parse_object_loop(PropList, Module, Options, _Props = [], _Required = [], _Refs = []). -parse_object_loop([], _Modlue, _Options, Props, Required, Refs) -> +parse_object_loop([], _Module, _Options, Props, Required, Refs) -> {lists:reverse(Props), lists:usort(Required), Refs}; parse_object_loop([{Name, Hocon} | Rest], Module, Options, Props, Required, Refs) -> NameBin = to_bin(Name), diff --git a/apps/emqx_redis/src/emqx_redis.erl b/apps/emqx_redis/src/emqx_redis.erl index 44137546d..25d64e2fa 100644 --- a/apps/emqx_redis/src/emqx_redis.erl +++ b/apps/emqx_redis/src/emqx_redis.erl @@ -20,7 +20,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). --export([roots/0, fields/1]). +-export([roots/0, fields/1, redis_fields/0, desc/1]). -behaviour(emqx_resource). @@ -50,55 +50,46 @@ roots() -> {config, #{ type => hoconsc:union( [ - hoconsc:ref(?MODULE, cluster), - hoconsc:ref(?MODULE, single), - hoconsc:ref(?MODULE, sentinel) + ?R_REF(redis_cluster), + ?R_REF(redis_single), + ?R_REF(redis_sentinel) ] ) }} ]. -fields(single) -> - [ - {server, server()}, - {redis_type, #{ - type => single, - default => single, - required => false, - desc => ?DESC("single") - }} - ] ++ +fields(redis_single) -> + fields(redis_single_connector) ++ redis_fields() ++ emqx_connector_schema_lib:ssl_fields(); -fields(cluster) -> +fields(redis_single_connector) -> [ - {servers, servers()}, - {redis_type, #{ - type => cluster, - default => cluster, - required => false, - desc => ?DESC("cluster") - }} - ] ++ + {server, server()}, + redis_type(single) + ]; +fields(redis_cluster) -> + fields(redis_cluster_connector) ++ lists:keydelete(database, 1, redis_fields()) ++ emqx_connector_schema_lib:ssl_fields(); -fields(sentinel) -> +fields(redis_cluster_connector) -> [ {servers, servers()}, - {redis_type, #{ - type => sentinel, - default => sentinel, - required => false, - desc => ?DESC("sentinel") - }}, + redis_type(cluster) + ]; +fields(redis_sentinel) -> + fields(redis_sentinel_connector) ++ + redis_fields() ++ + emqx_connector_schema_lib:ssl_fields(); +fields(redis_sentinel_connector) -> + [ + {servers, servers()}, + redis_type(sentinel), {sentinel, #{ type => string(), required => true, desc => ?DESC("sentinel_desc") }} - ] ++ - redis_fields() ++ - emqx_connector_schema_lib:ssl_fields(). + ]. server() -> Meta = #{desc => ?DESC("server")}, @@ -108,64 +99,52 @@ servers() -> Meta = #{desc => ?DESC("servers")}, emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS). +desc(redis_cluster_connector) -> + ?DESC(redis_cluster_connector); +desc(redis_single_connector) -> + ?DESC(redis_single_connector); +desc(redis_sentinel_connector) -> + ?DESC(redis_sentinel_connector); +desc(_) -> + undefined. + %% =================================================================== +redis_type(Type) -> + {redis_type, #{ + type => Type, + default => Type, + required => false, + desc => ?DESC(Type) + }}. + callback_mode() -> always_sync. -on_start( - InstId, - #{ - redis_type := Type, - pool_size := PoolSize, - ssl := SSL - } = Config -) -> +on_start(InstId, Config0) -> ?SLOG(info, #{ msg => "starting_redis_connector", connector => InstId, - config => emqx_utils:redact(Config) + config => emqx_utils:redact(Config0) }), - ConfKey = - case Type of - single -> server; - _ -> servers - end, - Servers0 = maps:get(ConfKey, Config), - Servers1 = lists:map( - fun(#{hostname := Host, port := Port}) -> - {Host, Port} - end, - emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS) - ), - Servers = [{servers, Servers1}], - Database = - case Type of - cluster -> []; - _ -> [{database, maps:get(database, Config)}] - end, + Config = config(Config0), + #{pool_size := PoolSize, ssl := SSL, redis_type := Type} = Config, + Options = ssl_options(SSL) ++ [{sentinel, maps:get(sentinel, Config, undefined)}], Opts = [ - {pool_size, PoolSize}, {username, maps:get(username, Config, undefined)}, {password, maps:get(password, Config, "")}, + {servers, servers(Config)}, + {options, Options}, + {pool_size, PoolSize}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL} - ] ++ Database ++ Servers, - Options = - case maps:get(enable, SSL) of - true -> - [ - {ssl, true}, - {ssl_options, emqx_tls_lib:to_client_opts(SSL)} - ]; - false -> - [{ssl, false}] - end ++ [{sentinel, maps:get(sentinel, Config, undefined)}], + ] ++ database(Config), + State = #{pool_name => InstId, type => Type}, ok = emqx_resource:allocate_resource(InstId, type, Type), ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case Type of cluster -> - case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of + case eredis_cluster:start_pool(InstId, Opts) of {ok, _} -> {ok, State}; {ok, _, _} -> @@ -174,7 +153,7 @@ on_start( {error, Reason} end; _ -> - case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ [{options, Options}]) of + case emqx_resource_pool:start(InstId, ?MODULE, Opts) of ok -> {ok, State}; {error, Reason} -> @@ -182,6 +161,14 @@ on_start( end end. +ssl_options(SSL = #{enable := true}) -> + [ + {ssl, true}, + {ssl_options, emqx_tls_lib:to_client_opts(SSL)} + ]; +ssl_options(#{enable := false}) -> + [{ssl, false}]. + on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_redis_connector", @@ -189,7 +176,11 @@ on_stop(InstId, _State) -> }), case emqx_resource:get_allocated_resources(InstId) of #{pool_name := PoolName, type := cluster} -> - eredis_cluster:stop_pool(PoolName); + case eredis_cluster:stop_pool(PoolName) of + {error, not_found} -> ok; + ok -> ok; + Error -> Error + end; #{pool_name := PoolName, type := _} -> emqx_resource_pool:stop(PoolName); _ -> @@ -244,8 +235,17 @@ is_unrecoverable_error(_) -> on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) -> case eredis_cluster:pool_exists(PoolName) of true -> - Health = eredis_cluster:ping_all(PoolName), - status_result(Health); + %% eredis_cluster has null slot even pool_exists when emqx start before redis cluster. + %% we need restart eredis_cluster pool when pool_worker(slot) is empty. + %% If the pool is empty, it means that there are no workers attempting to reconnect. + %% In this case, we can directly consider it as a disconnect and then proceed to reconnect. + case eredis_cluster_monitor:get_all_pools(PoolName) of + [] -> + disconnected; + [_ | _] -> + Health = eredis_cluster:ping_all(PoolName), + status_result(Health) + end; false -> disconnected end; @@ -289,6 +289,28 @@ wrap_qp_result(Results) when is_list(Results) -> end. %% =================================================================== +%% parameters for connector +config(#{parameters := #{} = Param} = Config) -> + maps:merge(maps:remove(parameters, Config), Param); +%% is for authn/authz +config(Config) -> + Config. + +servers(#{server := Server}) -> + servers(Server); +servers(#{servers := Servers}) -> + servers(Servers); +servers(Servers) -> + lists:map( + fun(#{hostname := Host, port := Port}) -> + {Host, Port} + end, + emqx_schema:parse_servers(Servers, ?REDIS_HOST_OPTIONS) + ). + +database(#{redis_type := cluster}) -> []; +database(#{database := Database}) -> [{database, Database}]. + connect(Opts) -> eredis:start_link(Opts). diff --git a/apps/emqx_redis/test/emqx_redis_SUITE.erl b/apps/emqx_redis/test/emqx_redis_SUITE.erl index 8fcbf2b63..e7ce7dd4f 100644 --- a/apps/emqx_redis/test/emqx_redis_SUITE.erl +++ b/apps/emqx_redis/test/emqx_redis_SUITE.erl @@ -223,7 +223,7 @@ redis_config_base(Type, ServerKey) -> "sentinel" -> Host = ?REDIS_SENTINEL_HOST, Port = ?REDIS_SENTINEL_PORT, - MaybeSentinel = " sentinel = mymaster\n", + MaybeSentinel = " sentinel = mytcpmaster\n", MaybeDatabase = " database = 1\n"; "single" -> Host = ?REDIS_SINGLE_HOST, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 7feacee77..8b27cdda4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -13,7 +13,11 @@ uuid, emqx, emqx_utils, - emqx_ctl + emqx_ctl, + %% rule_engine should wait for bridge connector start, + %% it's will check action/connector ref's exist. + emqx_bridge, + emqx_connector ]}, {mod, {emqx_rule_engine_app, []}}, {env, []}, diff --git a/rel/i18n/emqx_bridge_redis.hocon b/rel/i18n/emqx_bridge_redis.hocon index 05c8d95a6..03831b02f 100644 --- a/rel/i18n/emqx_bridge_redis.hocon +++ b/rel/i18n/emqx_bridge_redis.hocon @@ -32,14 +32,19 @@ desc_type.desc: desc_type.label: """Bridge Type""" -local_topic.desc: +desc_local_topic.desc: """The MQTT topic filter to be forwarded to Redis. All MQTT 'PUBLISH' messages with the topic matching the local_topic will be forwarded.
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that match local_topic will be forwarded.""" -local_topic.label: +desc_local_topic.label: """Local Topic""" +desc_action_parameters.desc: +"""The parameters of the action.""" +desc_action_parameters.label: +"""Action Parameters""" + } diff --git a/rel/i18n/emqx_bridge_redis_schema.hocon b/rel/i18n/emqx_bridge_redis_schema.hocon new file mode 100644 index 000000000..861c0c185 --- /dev/null +++ b/rel/i18n/emqx_bridge_redis_schema.hocon @@ -0,0 +1,41 @@ +emqx_bridge_redis_schema { +redis_parameters.label: +"""Redis Type Specific Parameters""" + +redis_parameters.desc: +"""Set of parameters specific for the given type of this Redis connector, `redis_type` can be one of `single`, `cluster` or `sentinel`.""" + +producer_action.desc: +"""The parameters of the action.""" +producer_action.label: +"""Action Parameters""" + +redis_type.label: +"""Redis Type""" +redis_type.desc: +"""Single mode. Must be set to 'single' when Redis server is running in single mode. +Sentinel mode. Must be set to 'sentinel' when Redis server is running in sentinel mode. +Cluster mode. Must be set to 'cluster' when Redis server is running in clustered mode.""" + +batch_size.label: +"""Batch Size""" +batch_size.desc: +"""This parameter defines the upper limit of the batch count. +Setting this value to 1 effectively disables batching, as it indicates that only one item will be processed per batch. +Note on Redis Cluster Mode: +In the context of Redis Cluster Mode, it is important to note that batching is not supported. +Consequently, the batch_size is always set to 1, +reflecting the mode inherent limitation in handling batch operations.""" + +batch_time.desc: +"""Maximum waiting interval when accumulating a batch at a low message rates for more efficient resource usage.""" + +batch_time.label: +"""Max batch wait time, disable when in Redis Cluster Mode.""" + +redis_action.label: +"""Redis Action""" +redis_action.desc: +"""Action to interact with a Redis connector.""" + +} diff --git a/rel/i18n/emqx_connector_schema.hocon b/rel/i18n/emqx_connector_schema.hocon index 16d153e12..24baefd89 100644 --- a/rel/i18n/emqx_connector_schema.hocon +++ b/rel/i18n/emqx_connector_schema.hocon @@ -1,5 +1,11 @@ emqx_connector_schema { +config_enable.desc: +"""Enable (true) or disable (false) this connector.""" + +config_enable.label: +"""Enable or Disable""" + desc_connectors.desc: """Connectors that are used to connect to external systems""" desc_connectors.label: diff --git a/rel/i18n/emqx_redis.hocon b/rel/i18n/emqx_redis.hocon index af20c5980..28f531094 100644 --- a/rel/i18n/emqx_redis.hocon +++ b/rel/i18n/emqx_redis.hocon @@ -47,4 +47,18 @@ single.desc: single.label: """Single Mode""" +redis_cluster_connector.label: +"""Redis Cluster Connector""" +redis_cluster_connector.desc: +"""Redis connector in cluster mode""" + +redis_sentinel_connector.label: +"""Redis Sentinel Connector""" +redis_sentinel_connector.desc: +"""Redis connector in sentinel mode""" + +redis_single_connector.label: +"""Redis Single Connector""" +redis_single_connector.desc: +"""Redis connector in sentinel mode""" } diff --git a/scripts/pre-compile.sh b/scripts/pre-compile.sh index dfad7c869..ce2c532ca 100755 --- a/scripts/pre-compile.sh +++ b/scripts/pre-compile.sh @@ -27,10 +27,20 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.." I18N_REPO_BRANCH="v$(./pkg-vsn.sh "${PROFILE_STR}" | tr -d '.' | cut -c 1-2)" +UPDATE_I18N=${UPDATE_I18N:-true} # download desc (i18n) translations -curl -L --silent --show-error \ - --output "apps/emqx_dashboard/priv/desc.zh.hocon" \ - "https://raw.githubusercontent.com/emqx/emqx-i18n/${I18N_REPO_BRANCH}/desc.zh.hocon" +if [ "$UPDATE_I18N" = "true" ]; then + echo "updating i18n file from emqx-i18n repo" + start=$(date +%s) + curl -L --silent --show-error \ + --output "apps/emqx_dashboard/priv/desc.zh.hocon" \ + "https://raw.githubusercontent.com/emqx/emqx-i18n/${I18N_REPO_BRANCH}/desc.zh.hocon" + end=$(date +%s) + duration=$(echo "$end $start" | awk '{print $1 - $2}') + echo "updated i18n file using $duration seconds, set UPDATE_I18N=false to skip" +else + echo "skipping update i18n file from emqx-i18n repo, set UPDATE_I18N=true to update" +fi # TODO # make sbom a build artifcat From 009a15c7d0228181c75e6c10ddc440bfcb478190 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Wed, 29 Nov 2023 12:12:42 +0200 Subject: [PATCH 6/8] fix(emqx_mgmt_api_data_backup): validate empty file in upload API fixes EMQX-11488 --- apps/emqx_management/src/emqx_mgmt_api_data_backup.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_api_data_backup.erl index 4cd7d404e..7cd151b08 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_data_backup.erl @@ -204,7 +204,7 @@ data_export(post, _Request) -> data_import(post, #{body := #{<<"filename">> := FileName} = Body}) -> case safe_parse_node(Body) of {error, Msg} -> - {400, #{code => 'BAD_REQUEST', message => Msg}}; + {400, #{code => ?BAD_REQUEST, message => Msg}}; FileNode -> CoreNode = core_node(FileNode), response( @@ -231,8 +231,10 @@ data_files(post, #{body := #{<<"filename">> := #{type := _} = File}}) -> ok -> {204}; {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => emqx_mgmt_data_backup:format_error(Reason)}} + {400, #{code => ?BAD_REQUEST, message => emqx_mgmt_data_backup:format_error(Reason)}} end; +data_files(post, #{body := _}) -> + {400, #{code => ?BAD_REQUEST, message => "Missing required parameter: filename"}}; data_files(get, #{query_string := PageParams}) -> case emqx_mgmt_api:parse_pager_params(PageParams) of false -> @@ -244,7 +246,7 @@ data_files(get, #{query_string := PageParams}) -> data_file_by_name(Method, #{bindings := #{filename := Filename}, query_string := QS}) -> case safe_parse_node(QS) of {error, Msg} -> - {400, #{code => 'BAD_REQUEST', message => Msg}}; + {400, #{code => ?BAD_REQUEST, message => Msg}}; Node -> case get_or_delete_file(Method, Filename, Node) of {error, not_found} -> From 9e85deb273db10a6fe02ea317129fa8ebfb3ca79 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 29 Nov 2023 18:22:38 +0800 Subject: [PATCH 7/8] chore: skip download i18n translate file env --- scripts/pre-compile.sh | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scripts/pre-compile.sh b/scripts/pre-compile.sh index ce2c532ca..71b42d003 100755 --- a/scripts/pre-compile.sh +++ b/scripts/pre-compile.sh @@ -27,21 +27,21 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.." I18N_REPO_BRANCH="v$(./pkg-vsn.sh "${PROFILE_STR}" | tr -d '.' | cut -c 1-2)" -UPDATE_I18N=${UPDATE_I18N:-true} +DOWNLOAD_I18N_TRANSLATIONS=${DOWNLOAD_I18N_TRANSLATIONS:-true} # download desc (i18n) translations -if [ "$UPDATE_I18N" = "true" ]; then - echo "updating i18n file from emqx-i18n repo" +if [ "$DOWNLOAD_I18N_TRANSLATIONS" = "true" ]; then + echo "downloading i18n translation from emqx/emqx-i18n" start=$(date +%s) curl -L --silent --show-error \ --output "apps/emqx_dashboard/priv/desc.zh.hocon" \ "https://raw.githubusercontent.com/emqx/emqx-i18n/${I18N_REPO_BRANCH}/desc.zh.hocon" end=$(date +%s) duration=$(echo "$end $start" | awk '{print $1 - $2}') - echo "updated i18n file using $duration seconds, set UPDATE_I18N=false to skip" + echo "downloaded i18n translation in $duration seconds, set DOWNLOAD_I18N_TRANSLATIONS=false to skip" else - echo "skipping update i18n file from emqx-i18n repo, set UPDATE_I18N=true to update" + echo "skipping to download i18n translation from emqx/emqx-i18n, set DOWNLOAD_I18N_TRANSLATIONS=true to update" fi # TODO -# make sbom a build artifcat +# make sbom a build artifact # ./scripts/update-bom.sh "$PROFILE_STR" ./rel From f54bda70637cd02bb2bafca6deffc1234cb438de Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Thu, 30 Nov 2023 19:10:12 +0200 Subject: [PATCH 8/8] fix(emqx_mgmt_api_data_backup): add count and hasnext meta paging params --- .../src/emqx_mgmt_api_data_backup.erl | 8 ++++++-- .../test/emqx_mgmt_api_data_backup_SUITE.erl | 20 ++++++++++++++----- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_api_data_backup.erl index 7cd151b08..ef0b095cb 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_data_backup.erl @@ -240,7 +240,8 @@ data_files(get, #{query_string := PageParams}) -> false -> {400, #{code => ?BAD_REQUEST, message => <<"page_limit_invalid">>}}; #{page := Page, limit := Limit} = Pager -> - {200, #{data => list_backup_files(Page, Limit), meta => Pager}} + {Count, HasNext, Data} = list_backup_files(Page, Limit), + {200, #{data => Data, meta => Pager#{count => Count, hasnext => HasNext}}} end. data_file_by_name(Method, #{bindings := #{filename := Filename}, query_string := QS}) -> @@ -295,7 +296,10 @@ response({error, Reason}) -> list_backup_files(Page, Limit) -> Start = Page * Limit - Limit + 1, - lists:sublist(list_backup_files(), Start, Limit). + AllFiles = list_backup_files(), + Count = length(AllFiles), + HasNext = Start + Limit - 1 < Count, + {Count, HasNext, lists:sublist(AllFiles, Start, Limit)}. list_backup_files() -> Nodes = emqx:running_nodes(), diff --git a/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl index cef32ab92..c2e01fedc 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl @@ -80,22 +80,32 @@ t_list_backups(Config) -> [{ok, _} = export_backup(?NODE2_PORT, Auth) || _ <- lists:seq(1, 10)], {ok, RespBody} = list_backups(?NODE1_PORT, Auth, <<"1">>, <<"100">>), - #{<<"data">> := Data, <<"meta">> := _} = emqx_utils_json:decode(RespBody), + #{<<"data">> := Data, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode( + RespBody + ), ?assertEqual(20, length(Data)), {ok, EmptyRespBody} = list_backups(?NODE2_PORT, Auth, <<"2">>, <<"100">>), - #{<<"data">> := EmptyData, <<"meta">> := _} = emqx_utils_json:decode(EmptyRespBody), + #{<<"data">> := EmptyData, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode( + EmptyRespBody + ), ?assertEqual(0, length(EmptyData)), {ok, RespBodyP1} = list_backups(?NODE3_PORT, Auth, <<"1">>, <<"10">>), {ok, RespBodyP2} = list_backups(?NODE3_PORT, Auth, <<"2">>, <<"10">>), {ok, RespBodyP3} = list_backups(?NODE3_PORT, Auth, <<"3">>, <<"10">>), - #{<<"data">> := DataP1, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP1), + #{<<"data">> := DataP1, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := true}} = emqx_utils_json:decode( + RespBodyP1 + ), ?assertEqual(10, length(DataP1)), - #{<<"data">> := DataP2, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP2), + #{<<"data">> := DataP2, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode( + RespBodyP2 + ), ?assertEqual(10, length(DataP2)), - #{<<"data">> := DataP3, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP3), + #{<<"data">> := DataP3, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode( + RespBodyP3 + ), ?assertEqual(0, length(DataP3)), ?assertEqual(Data, DataP1 ++ DataP2).