From 4e9f2c8f5d261e4fc15a8f19dde83ea74de9b493 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 14 Mar 2024 17:01:02 +0100 Subject: [PATCH] fix: DynamoDB connector status check takes too long The DynamoDB connector status checks takes very long when the server is unavailable which makes the resource manager blocked for a long time. This causes calls to update the Bridge config with the Bridge V1 API fail due to a timeout when it calls the resource manager to remove the channel. A better fix would be to change the resource manager so that the status check cannot block it for a long time. However, this is more complicated so it needs to be done in a later commit. A new ticket has been created for this task https://emqx.atlassian.net/browse/EMQX-12015 . Fixes: https://emqx.atlassian.net/browse/EMQX-11984 --- .../src/emqx_bridge_dynamo_connector.erl | 4 +- .../emqx_bridge_dynamo_connector_client.erl | 2 + .../test/emqx_bridge_dynamo_SUITE.erl | 51 ++++++++++++++++++- 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index da71abea7..36f54a63f 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -179,13 +179,13 @@ on_batch_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. health_check_timeout() -> - 15000. + 2500. on_get_status(_InstanceId, #{pool_name := Pool} = State) -> Health = emqx_resource_pool:health_check_workers( Pool, {emqx_bridge_dynamo_connector_client, is_connected, [ - emqx_resource_pool:health_check_timeout() + health_check_timeout() ]}, health_check_timeout(), #{return_values => true} diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl index 23caa0c30..4f924ef67 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl @@ -34,6 +34,8 @@ is_connected(Pid, Timeout) -> try gen_server:call(Pid, is_connected, Timeout) catch + _:{timeout, _} -> + {false, <<"timeout_while_checking_connection_dynamo_client">>}; _:Error -> {false, Error} end. diff --git a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl index 8fcdf97ce..dab7b21f0 100644 --- a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl +++ b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl @@ -88,7 +88,9 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_conf, erlcloud]), + ok = emqx_common_test_helpers:stop_apps([ + emqx_rule_engine, emqx_bridge, emqx_resource, emqx_conf, erlcloud + ]), ok. init_per_testcase(TestCase, Config) -> @@ -134,7 +136,7 @@ common_init(ConfigT) -> emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), % Ensure enterprise bridge module is loaded ok = emqx_common_test_helpers:start_apps([ - emqx_conf, emqx_resource, emqx_bridge + emqx_conf, emqx_resource, emqx_bridge, emqx_rule_engine ]), _ = application:ensure_all_started(erlcloud), _ = emqx_bridge_enterprise:module_info(), @@ -273,6 +275,24 @@ create_bridge_http(Params) -> Error -> Error end. +update_bridge_http(#{<<"type">> := Type, <<"name">> := Name} = Config) -> + BridgeID = emqx_bridge_resource:bridge_id(Type, Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeID]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Config) of + {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; + Error -> Error + end. + +get_bridge_http(#{<<"type">> := Type, <<"name">> := Name}) -> + BridgeID = emqx_bridge_resource:bridge_id(Type, Name), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeID]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader) of + {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; + Error -> Error + end. + send_message(Config, Payload) -> Name = ?config(dynamo_name, Config), BridgeType = ?config(dynamo_bridge_type, Config), @@ -359,6 +379,33 @@ t_setup_via_config_and_publish(Config) -> ), ok. +%% https://emqx.atlassian.net/browse/EMQX-11984 +t_setup_via_http_api_and_update_wrong_config(Config) -> + BridgeType = ?config(dynamo_bridge_type, Config), + Name = ?config(dynamo_name, Config), + PgsqlConfig0 = ?config(dynamo_config, Config), + PgsqlConfig = PgsqlConfig0#{ + <<"name">> => Name, + <<"type">> => BridgeType, + %% NOTE: using literal secret with HTTP API requests. + <<"aws_secret_access_key">> => <> + }, + BrokenConfig = PgsqlConfig#{<<"url">> => <<"http://non_existing_host:9999">>}, + ?assertMatch( + {ok, _}, + create_bridge_http(BrokenConfig) + ), + WrongURL2 = <<"http://non_existing_host:9998">>, + BrokenConfig2 = PgsqlConfig#{<<"url">> => WrongURL2}, + ?assertMatch( + {ok, _}, + update_bridge_http(BrokenConfig2) + ), + %% Check that the update worked + {ok, Result} = get_bridge_http(PgsqlConfig), + ?assertMatch(#{<<"url">> := WrongURL2}, Result), + emqx_bridge:remove(BridgeType, Name). + t_setup_via_http_api_and_publish(Config) -> BridgeType = ?config(dynamo_bridge_type, Config), Name = ?config(dynamo_name, Config),