From 44a6e5ed15ec6122bd76ee890075a45c08aff98d Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 18 Jan 2023 14:33:45 +0200 Subject: [PATCH 1/2] chore(resources): add missing parameters to emqx_resource schema --- apps/emqx_resource/include/emqx_resource.hrl | 8 ++++++++ apps/emqx_resource/src/emqx_resource_manager.erl | 5 ----- .../src/schema/emqx_resource_schema.erl | 14 ++++++++++++++ changes/v5.0.15/feat-9628.en.md | 1 + changes/v5.0.15/feat-9628.zh.md | 1 + 5 files changed, 24 insertions(+), 5 deletions(-) create mode 100644 changes/v5.0.15/feat-9628.en.md create mode 100644 changes/v5.0.15/feat-9628.zh.md diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index d7b080ae8..5785ddead 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -101,6 +101,14 @@ -define(HEALTHCHECK_INTERVAL, 15000). -define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>). +%% milliseconds +-define(START_TIMEOUT, 5000). +-define(START_TIMEOUT_RAW, <<"5s">>). + +%% boolean +-define(START_AFTER_CREATED, true). +-define(START_AFTER_CREATED_RAW, <<"true">>). + %% milliseconds -define(AUTO_RESTART_INTERVAL, 60000). -define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index ab726976a..1dd088fca 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -116,11 +116,6 @@ create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) -> {ok, _Group, Data} = lookup(ResId), {ok, Data}. -%% internal configs --define(START_AFTER_CREATED, true). -%% in milliseconds --define(START_TIMEOUT, 5000). - %% @doc Create a resource_manager and wait until it is running create(MgrId, ResId, Group, ResourceType, Config, Opts) -> % The state machine will make the actual call to the callback/resource module after init diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index ea5ee97ca..39513e28c 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -46,6 +46,8 @@ fields("creation_opts") -> [ {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, + {start_after_created, fun start_after_created/1}, + {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, {query_mode, fun query_mode/1}, {request_timeout, fun request_timeout/1}, @@ -69,6 +71,18 @@ health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; health_check_interval(required) -> false; health_check_interval(_) -> undefined. +start_after_created(type) -> boolean(); +start_after_created(desc) -> ?DESC("start_after_created"); +start_after_created(default) -> ?START_AFTER_CREATED_RAW; +start_after_created(required) -> false; +start_after_created(_) -> undefined. + +start_timeout(type) -> emqx_schema:duration_ms(); +start_timeout(desc) -> ?DESC("start_timeout"); +start_timeout(default) -> ?START_TIMEOUT_RAW; +start_timeout(required) -> false; +start_timeout(_) -> undefined. + auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); auto_restart_interval(desc) -> ?DESC("auto_restart_interval"); auto_restart_interval(default) -> ?AUTO_RESTART_INTERVAL_RAW; diff --git a/changes/v5.0.15/feat-9628.en.md b/changes/v5.0.15/feat-9628.en.md new file mode 100644 index 000000000..6f814dd21 --- /dev/null +++ b/changes/v5.0.15/feat-9628.en.md @@ -0,0 +1 @@ +Expose additional resource configuration parameters: `start_after_created` and `start_timeout`. diff --git a/changes/v5.0.15/feat-9628.zh.md b/changes/v5.0.15/feat-9628.zh.md new file mode 100644 index 000000000..fee14181b --- /dev/null +++ b/changes/v5.0.15/feat-9628.zh.md @@ -0,0 +1 @@ +为桥接资源增加了配置参数:`start_after_created` 和 `start_timeout`。 From f6fbbf3ee3ffd9816cb456e5097df2bcf843b3ec Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 18 Jan 2023 14:34:11 +0200 Subject: [PATCH 2/2] chore(bridges): reduce Redis bridge flakyness --- .ci/docker-compose-file/Makefile.local | 2 +- .../test/emqx_ee_bridge_redis_SUITE.erl | 92 +++++++++++-------- 2 files changed, 53 insertions(+), 41 deletions(-) diff --git a/.ci/docker-compose-file/Makefile.local b/.ci/docker-compose-file/Makefile.local index ff4f348b0..2cf0802ce 100644 --- a/.ci/docker-compose-file/Makefile.local +++ b/.ci/docker-compose-file/Makefile.local @@ -52,7 +52,7 @@ down: down --remove-orphans ct: - docker exec -i "$(CONTAINER)" bash -c "rebar3 ct --name 'test@127.0.0.1' -v --suite $(SUITE)" + docker exec -i "$(CONTAINER)" bash -c "rebar3 ct --name 'test@127.0.0.1' --readable true -v --suite $(SUITE)" ct-all: docker exec -i "$(CONTAINER)" bash -c "make ct" diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index feebcfade..2b67787b2 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -137,6 +137,7 @@ end_per_suite(_Config) -> ok. init_per_testcase(_Testcase, Config) -> + ok = delete_all_rules(), ok = delete_all_bridges(), case ?config(transport_type, Config) of undefined -> @@ -248,29 +249,27 @@ t_check_replay(Config) -> ), ?check_trace( - begin - ?wait_async_action( - with_down_failure(Config, ProxyName, fun() -> - {_, {ok, _}} = - ?wait_async_action( - lists:foreach( - fun(_) -> - _ = publish_message(Topic, <<"test_payload">>) - end, - lists:seq(1, ?BATCH_SIZE) - ), - #{ - ?snk_kind := redis_ee_connector_send_done, - batch := true, - result := {error, _} - }, - 10_000 - ) - end), - #{?snk_kind := redis_ee_connector_send_done, batch := true, result := {ok, _}}, - 10_000 - ) - end, + ?wait_async_action( + with_down_failure(Config, ProxyName, fun() -> + {_, {ok, _}} = + ?wait_async_action( + lists:foreach( + fun(_) -> + _ = publish_message(Topic, <<"test_payload">>) + end, + lists:seq(1, ?BATCH_SIZE) + ), + #{ + ?snk_kind := redis_ee_connector_send_done, + batch := true, + result := {error, _} + }, + 10_000 + ) + end), + #{?snk_kind := redis_ee_connector_send_done, batch := true, result := {ok, _}}, + 10_000 + ), fun(Trace) -> ?assert( ?strict_causality( @@ -340,7 +339,7 @@ with_down_failure(Config, Name, F) -> ProxyHost = ?config(proxy_host, Config), emqx_common_test_helpers:with_failure(down, Name, ProxyHost, ProxyPort, F). -check_resource_queries(ResourceId, Topic, IsBatch) -> +check_resource_queries(ResourceId, BaseTopic, IsBatch) -> RandomPayload = rand:bytes(20), N = case IsBatch of @@ -348,18 +347,18 @@ check_resource_queries(ResourceId, Topic, IsBatch) -> false -> 1 end, ?check_trace( - begin - ?wait_async_action( - lists:foreach( - fun(_) -> - _ = publish_message(Topic, RandomPayload) - end, - lists:seq(1, N) - ), - #{?snk_kind := redis_ee_connector_send_done, batch := IsBatch}, - 1000 - ) - end, + ?wait_async_action( + lists:foreach( + fun(I) -> + IBin = integer_to_binary(I), + Topic = <>, + _ = publish_message(Topic, RandomPayload) + end, + lists:seq(1, N) + ), + #{?snk_kind := redis_ee_connector_send_done, batch := IsBatch}, + 5000 + ), fun(Trace) -> AddedMsgCount = length(added_msgs(ResourceId, RandomPayload)), case IsBatch of @@ -394,6 +393,14 @@ conf_schema(StructName) -> roots => [{root, hoconsc:ref(emqx_ee_bridge_redis, StructName)}] }. +delete_all_rules() -> + lists:foreach( + fun(#{id := RuleId}) -> + emqx_rule_engine:delete_rule(RuleId) + end, + emqx_rule_engine:get_rules() + ). + delete_all_bridges() -> lists:foreach( fun(#{name := Name, type := Type}) -> @@ -490,7 +497,8 @@ toxiproxy_redis_bridge_config() -> <<"query_mode">> => <<"async">>, <<"worker_pool_size">> => <<"1">>, <<"batch_size">> => integer_to_binary(?BATCH_SIZE), - <<"health_check_interval">> => <<"1s">> + <<"health_check_interval">> => <<"1s">>, + <<"start_timeout">> => <<"15s">> } }, maps:merge(Conf0, ?COMMON_REDIS_OPTS). @@ -500,8 +508,10 @@ invalid_command_bridge_config() -> Conf1 = maps:merge(Conf0, ?COMMON_REDIS_OPTS), Conf1#{ <<"resource_opts">> => #{ + <<"query_mode">> => <<"sync">>, <<"batch_size">> => <<"1">>, - <<"worker_pool_size">> => <<"1">> + <<"worker_pool_size">> => <<"1">>, + <<"start_timeout">> => <<"15s">> }, <<"command_template">> => [<<"BAD">>, <<"COMMAND">>, <<"${payload}">>] }. @@ -510,12 +520,14 @@ resource_configs() -> #{ batch_off => #{ <<"query_mode">> => <<"sync">>, - <<"batch_size">> => <<"1">> + <<"batch_size">> => <<"1">>, + <<"start_timeout">> => <<"15s">> }, batch_on => #{ <<"query_mode">> => <<"async">>, <<"worker_pool_size">> => <<"1">>, - <<"batch_size">> => integer_to_binary(?BATCH_SIZE) + <<"batch_size">> => integer_to_binary(?BATCH_SIZE), + <<"start_timeout">> => <<"15s">> } }.