Merge pull request #10523 from lafirest/fix/rmv_dynamo_async

fix(dynamo): remove all async callbacks of the Dynamo connector
This commit is contained in:
lafirest 2023-04-26 19:25:05 +08:00 committed by GitHub
commit 9da5331ea5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 33 deletions

View File

@ -291,7 +291,7 @@ t_setup_via_config_and_publish(Config) ->
end,
fun(Trace0) ->
Trace = ?of_kind(dynamo_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace),
?assertMatch([#{result := {ok, _}}], Trace),
ok
end
),
@ -328,7 +328,7 @@ t_setup_via_http_api_and_publish(Config) ->
end,
fun(Trace0) ->
Trace = ?of_kind(dynamo_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace),
?assertMatch([#{result := {ok, _}}], Trace),
ok
end
),

View File

@ -22,8 +22,6 @@
on_stop/2,
on_query/3,
on_batch_query/3,
on_query_async/4,
on_batch_query_async/4,
on_get_status/2
]).
@ -60,7 +58,7 @@ fields(config) ->
%% `emqx_resource' API
%%========================================================================================
callback_mode() -> async_if_possible.
callback_mode() -> always_sync.
is_buffer_supported() -> false.
@ -115,32 +113,15 @@ on_stop(InstanceId, #{poolname := PoolName} = _State) ->
emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstanceId, Query, State) ->
do_query(InstanceId, Query, sync, State).
on_query_async(InstanceId, Query, ReplyCtx, State) ->
do_query(
InstanceId,
Query,
{async, ReplyCtx},
State
).
do_query(InstanceId, Query, State).
%% we only support batch insert
on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) ->
do_query(InstanceId, Query, sync, State);
do_query(InstanceId, Query, State);
on_batch_query(_InstanceId, Query, _State) ->
{error, {unrecoverable_error, {invalid_request, Query}}}.
%% we only support batch insert
on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, ReplyCtx, State) ->
do_query(
InstanceId,
Query,
{async, ReplyCtx},
State
);
on_batch_query_async(_InstanceId, Query, _Reply, _State) ->
{error, {unrecoverable_error, {invalid_request, Query}}}.
on_get_status(_InstanceId, #{poolname := Pool}) ->
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(
@ -158,7 +139,6 @@ status_result(_Status = false) -> connecting.
do_query(
InstanceId,
Query,
ApplyMode,
#{poolname := PoolName, templates := Templates, table := Table} = State
) ->
?TRACE(
@ -168,7 +148,7 @@ do_query(
),
Result = ecpool:pick_and_do(
PoolName,
{emqx_ee_connector_dynamo_client, query, [ApplyMode, Table, Query, Templates]},
{emqx_ee_connector_dynamo_client, query, [Table, Query, Templates]},
no_handover
),

View File

@ -9,7 +9,6 @@
-export([
start_link/1,
is_connected/1,
query/5,
query/4
]).
@ -28,22 +27,22 @@
-export([execute/2]).
-endif.
%% The default timeout for DynamoDB REST API calls is 10 seconds,
%% but this value for `gen_server:call` is 5s,
%% so we should pass the timeout to `gen_server:call`
-define(HEALTH_CHECK_TIMEOUT, 10000).
%%%===================================================================
%%% API
%%%===================================================================
is_connected(Pid) ->
try
gen_server:call(Pid, is_connected)
gen_server:call(Pid, is_connected, ?HEALTH_CHECK_TIMEOUT)
catch
_:_ ->
false
end.
query(Pid, sync, Table, Query, Templates) ->
query(Pid, Table, Query, Templates);
query(Pid, {async, ReplyCtx}, Table, Query, Templates) ->
gen_server:cast(Pid, {query, Table, Query, Templates, ReplyCtx}).
query(Pid, Table, Query, Templates) ->
gen_server:call(Pid, {query, Table, Query, Templates}, infinity).