diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl index 3b07acbe0..88bce879e 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl @@ -291,7 +291,7 @@ t_setup_via_config_and_publish(Config) -> end, fun(Trace0) -> Trace = ?of_kind(dynamo_connector_query_return, Trace0), - ?assertMatch([#{result := ok}], Trace), + ?assertMatch([#{result := {ok, _}}], Trace), ok end ), @@ -328,7 +328,7 @@ t_setup_via_http_api_and_publish(Config) -> end, fun(Trace0) -> Trace = ?of_kind(dynamo_connector_query_return, Trace0), - ?assertMatch([#{result := ok}], Trace), + ?assertMatch([#{result := {ok, _}}], Trace), ok end ), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl index 5eee882ce..0620030ff 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -22,8 +22,6 @@ on_stop/2, on_query/3, on_batch_query/3, - on_query_async/4, - on_batch_query_async/4, on_get_status/2 ]). @@ -60,7 +58,7 @@ fields(config) -> %% `emqx_resource' API %%======================================================================================== -callback_mode() -> async_if_possible. +callback_mode() -> always_sync. is_buffer_supported() -> false. @@ -117,32 +115,15 @@ on_stop(InstanceId, #{pool_name := PoolName}) -> emqx_resource_pool:stop(PoolName). on_query(InstanceId, Query, State) -> - do_query(InstanceId, Query, sync, State). - -on_query_async(InstanceId, Query, ReplyCtx, State) -> - do_query( - InstanceId, - Query, - {async, ReplyCtx}, - State - ). + do_query(InstanceId, Query, State). %% we only support batch insert on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> - do_query(InstanceId, Query, sync, State); + do_query(InstanceId, Query, State); on_batch_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. %% we only support batch insert -on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, ReplyCtx, State) -> - do_query( - InstanceId, - Query, - {async, ReplyCtx}, - State - ); -on_batch_query_async(_InstanceId, Query, _Reply, _State) -> - {error, {unrecoverable_error, {invalid_request, Query}}}. on_get_status(_InstanceId, #{pool_name := Pool}) -> Health = emqx_resource_pool:health_check_workers( @@ -160,7 +141,6 @@ status_result(_Status = false) -> connecting. do_query( InstanceId, Query, - ApplyMode, #{pool_name := PoolName, templates := Templates, table := Table} = State ) -> ?TRACE( @@ -170,7 +150,7 @@ do_query( ), Result = ecpool:pick_and_do( PoolName, - {emqx_ee_connector_dynamo_client, query, [ApplyMode, Table, Query, Templates]}, + {emqx_ee_connector_dynamo_client, query, [Table, Query, Templates]}, no_handover ), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl index 0340655b4..8f27497fa 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl @@ -9,7 +9,6 @@ -export([ start_link/1, is_connected/1, - query/5, query/4 ]). @@ -28,22 +27,22 @@ -export([execute/2]). -endif. +%% The default timeout for DynamoDB REST API calls is 10 seconds, +%% but this value for `gen_server:call` is 5s, +%% so we should pass the timeout to `gen_server:call` +-define(HEALTH_CHECK_TIMEOUT, 10000). + %%%=================================================================== %%% API %%%=================================================================== is_connected(Pid) -> try - gen_server:call(Pid, is_connected) + gen_server:call(Pid, is_connected, ?HEALTH_CHECK_TIMEOUT) catch _:_ -> false end. -query(Pid, sync, Table, Query, Templates) -> - query(Pid, Table, Query, Templates); -query(Pid, {async, ReplyCtx}, Table, Query, Templates) -> - gen_server:cast(Pid, {query, Table, Query, Templates, ReplyCtx}). - query(Pid, Table, Query, Templates) -> gen_server:call(Pid, {query, Table, Query, Templates}, infinity).