From f6fbbf3ee3ffd9816cb456e5097df2bcf843b3ec Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 18 Jan 2023 14:34:11 +0200 Subject: [PATCH] chore(bridges): reduce Redis bridge flakyness --- .ci/docker-compose-file/Makefile.local | 2 +- .../test/emqx_ee_bridge_redis_SUITE.erl | 92 +++++++++++-------- 2 files changed, 53 insertions(+), 41 deletions(-) diff --git a/.ci/docker-compose-file/Makefile.local b/.ci/docker-compose-file/Makefile.local index ff4f348b0..2cf0802ce 100644 --- a/.ci/docker-compose-file/Makefile.local +++ b/.ci/docker-compose-file/Makefile.local @@ -52,7 +52,7 @@ down: down --remove-orphans ct: - docker exec -i "$(CONTAINER)" bash -c "rebar3 ct --name 'test@127.0.0.1' -v --suite $(SUITE)" + docker exec -i "$(CONTAINER)" bash -c "rebar3 ct --name 'test@127.0.0.1' --readable true -v --suite $(SUITE)" ct-all: docker exec -i "$(CONTAINER)" bash -c "make ct" diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index feebcfade..2b67787b2 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -137,6 +137,7 @@ end_per_suite(_Config) -> ok. init_per_testcase(_Testcase, Config) -> + ok = delete_all_rules(), ok = delete_all_bridges(), case ?config(transport_type, Config) of undefined -> @@ -248,29 +249,27 @@ t_check_replay(Config) -> ), ?check_trace( - begin - ?wait_async_action( - with_down_failure(Config, ProxyName, fun() -> - {_, {ok, _}} = - ?wait_async_action( - lists:foreach( - fun(_) -> - _ = publish_message(Topic, <<"test_payload">>) - end, - lists:seq(1, ?BATCH_SIZE) - ), - #{ - ?snk_kind := redis_ee_connector_send_done, - batch := true, - result := {error, _} - }, - 10_000 - ) - end), - #{?snk_kind := redis_ee_connector_send_done, batch := true, result := {ok, _}}, - 10_000 - ) - end, + ?wait_async_action( + with_down_failure(Config, ProxyName, fun() -> + {_, {ok, _}} = + ?wait_async_action( + lists:foreach( + fun(_) -> + _ = publish_message(Topic, <<"test_payload">>) + end, + lists:seq(1, ?BATCH_SIZE) + ), + #{ + ?snk_kind := redis_ee_connector_send_done, + batch := true, + result := {error, _} + }, + 10_000 + ) + end), + #{?snk_kind := redis_ee_connector_send_done, batch := true, result := {ok, _}}, + 10_000 + ), fun(Trace) -> ?assert( ?strict_causality( @@ -340,7 +339,7 @@ with_down_failure(Config, Name, F) -> ProxyHost = ?config(proxy_host, Config), emqx_common_test_helpers:with_failure(down, Name, ProxyHost, ProxyPort, F). -check_resource_queries(ResourceId, Topic, IsBatch) -> +check_resource_queries(ResourceId, BaseTopic, IsBatch) -> RandomPayload = rand:bytes(20), N = case IsBatch of @@ -348,18 +347,18 @@ check_resource_queries(ResourceId, Topic, IsBatch) -> false -> 1 end, ?check_trace( - begin - ?wait_async_action( - lists:foreach( - fun(_) -> - _ = publish_message(Topic, RandomPayload) - end, - lists:seq(1, N) - ), - #{?snk_kind := redis_ee_connector_send_done, batch := IsBatch}, - 1000 - ) - end, + ?wait_async_action( + lists:foreach( + fun(I) -> + IBin = integer_to_binary(I), + Topic = <>, + _ = publish_message(Topic, RandomPayload) + end, + lists:seq(1, N) + ), + #{?snk_kind := redis_ee_connector_send_done, batch := IsBatch}, + 5000 + ), fun(Trace) -> AddedMsgCount = length(added_msgs(ResourceId, RandomPayload)), case IsBatch of @@ -394,6 +393,14 @@ conf_schema(StructName) -> roots => [{root, hoconsc:ref(emqx_ee_bridge_redis, StructName)}] }. +delete_all_rules() -> + lists:foreach( + fun(#{id := RuleId}) -> + emqx_rule_engine:delete_rule(RuleId) + end, + emqx_rule_engine:get_rules() + ). + delete_all_bridges() -> lists:foreach( fun(#{name := Name, type := Type}) -> @@ -490,7 +497,8 @@ toxiproxy_redis_bridge_config() -> <<"query_mode">> => <<"async">>, <<"worker_pool_size">> => <<"1">>, <<"batch_size">> => integer_to_binary(?BATCH_SIZE), - <<"health_check_interval">> => <<"1s">> + <<"health_check_interval">> => <<"1s">>, + <<"start_timeout">> => <<"15s">> } }, maps:merge(Conf0, ?COMMON_REDIS_OPTS). @@ -500,8 +508,10 @@ invalid_command_bridge_config() -> Conf1 = maps:merge(Conf0, ?COMMON_REDIS_OPTS), Conf1#{ <<"resource_opts">> => #{ + <<"query_mode">> => <<"sync">>, <<"batch_size">> => <<"1">>, - <<"worker_pool_size">> => <<"1">> + <<"worker_pool_size">> => <<"1">>, + <<"start_timeout">> => <<"15s">> }, <<"command_template">> => [<<"BAD">>, <<"COMMAND">>, <<"${payload}">>] }. @@ -510,12 +520,14 @@ resource_configs() -> #{ batch_off => #{ <<"query_mode">> => <<"sync">>, - <<"batch_size">> => <<"1">> + <<"batch_size">> => <<"1">>, + <<"start_timeout">> => <<"15s">> }, batch_on => #{ <<"query_mode">> => <<"async">>, <<"worker_pool_size">> => <<"1">>, - <<"batch_size">> => integer_to_binary(?BATCH_SIZE) + <<"batch_size">> => integer_to_binary(?BATCH_SIZE), + <<"start_timeout">> => <<"15s">> } }.