fix(dynamo): remove all async callbacks of the Dynamo connector

This commit is contained in:
firest 2023-04-26 14:44:08 +08:00
parent c4081f9211
commit e467e082f0
2 changed files with 10 additions and 31 deletions

View File

@ -22,8 +22,6 @@
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
on_query_async/4,
on_batch_query_async/4,
on_get_status/2 on_get_status/2
]). ]).
@ -60,7 +58,7 @@ fields(config) ->
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
callback_mode() -> async_if_possible. callback_mode() -> always_sync.
is_buffer_supported() -> false. is_buffer_supported() -> false.
@ -115,32 +113,15 @@ on_stop(InstanceId, #{poolname := PoolName} = _State) ->
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstanceId, Query, State) -> on_query(InstanceId, Query, State) ->
do_query(InstanceId, Query, sync, State). do_query(InstanceId, Query, State).
on_query_async(InstanceId, Query, ReplyCtx, State) ->
do_query(
InstanceId,
Query,
{async, ReplyCtx},
State
).
%% we only support batch insert %% we only support batch insert
on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) ->
do_query(InstanceId, Query, sync, State); do_query(InstanceId, Query, State);
on_batch_query(_InstanceId, Query, _State) -> on_batch_query(_InstanceId, Query, _State) ->
{error, {unrecoverable_error, {invalid_request, Query}}}. {error, {unrecoverable_error, {invalid_request, Query}}}.
%% we only support batch insert %% we only support batch insert
on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, ReplyCtx, State) ->
do_query(
InstanceId,
Query,
{async, ReplyCtx},
State
);
on_batch_query_async(_InstanceId, Query, _Reply, _State) ->
{error, {unrecoverable_error, {invalid_request, Query}}}.
on_get_status(_InstanceId, #{poolname := Pool}) -> on_get_status(_InstanceId, #{poolname := Pool}) ->
Health = emqx_plugin_libs_pool:health_check_ecpool_workers( Health = emqx_plugin_libs_pool:health_check_ecpool_workers(
@ -158,7 +139,6 @@ status_result(_Status = false) -> connecting.
do_query( do_query(
InstanceId, InstanceId,
Query, Query,
ApplyMode,
#{poolname := PoolName, templates := Templates, table := Table} = State #{poolname := PoolName, templates := Templates, table := Table} = State
) -> ) ->
?TRACE( ?TRACE(
@ -168,7 +148,7 @@ do_query(
), ),
Result = ecpool:pick_and_do( Result = ecpool:pick_and_do(
PoolName, PoolName,
{emqx_ee_connector_dynamo_client, query, [ApplyMode, Table, Query, Templates]}, {emqx_ee_connector_dynamo_client, query, [Table, Query, Templates]},
no_handover no_handover
), ),

View File

@ -9,7 +9,6 @@
-export([ -export([
start_link/1, start_link/1,
is_connected/1, is_connected/1,
query/5,
query/4 query/4
]). ]).
@ -28,22 +27,22 @@
-export([execute/2]). -export([execute/2]).
-endif. -endif.
%% The default timeout for DynamoDB REST API calls is 10 seconds,
%% but this value for `gen_server:call` is 5s,
%% so we should pass the timeout to `gen_server:call`
-define(HEALTH_CHECK_TIMEOUT, 10000).
%%%=================================================================== %%%===================================================================
%%% API %%% API
%%%=================================================================== %%%===================================================================
is_connected(Pid) -> is_connected(Pid) ->
try try
gen_server:call(Pid, is_connected) gen_server:call(Pid, is_connected, ?HEALTH_CHECK_TIMEOUT)
catch catch
_:_ -> _:_ ->
false false
end. end.
query(Pid, sync, Table, Query, Templates) ->
query(Pid, Table, Query, Templates);
query(Pid, {async, ReplyCtx}, Table, Query, Templates) ->
gen_server:cast(Pid, {query, Table, Query, Templates, ReplyCtx}).
query(Pid, Table, Query, Templates) -> query(Pid, Table, Query, Templates) ->
gen_server:call(Pid, {query, Table, Query, Templates}, infinity). gen_server:call(Pid, {query, Table, Query, Templates}, infinity).