diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index de77b26de..b50788277 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -330,8 +330,6 @@ create_rule_and_action_http(Config) -> %% Testcases %%------------------------------------------------------------------------------ -% Under normal operations, the bridge will be called async via -% `simple_async_query'. t_sync_query(Config) -> ResourceId = resource_id(Config), ?check_trace( @@ -360,48 +358,6 @@ t_sync_query(Config) -> ), ok. -t_async_query(Config) -> - Overrides = #{ - <<"resource_opts">> => #{ - <<"enable_batch">> => <<"false">>, - <<"batch_size">> => 1 - } - }, - ResourceId = resource_id(Config), - ?check_trace( - begin - ?assertMatch({ok, _}, create_bridge_api(Config, Overrides)), - ?retry( - _Sleep = 1_000, - _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) - ), - reset_table(Config), - MsgId = erlang:unique_integer(), - Params = #{ - topic => ?config(mqtt_topic, Config), - id => MsgId, - payload => ?config(oracle_name, Config), - retain => false - }, - Message = {send_message, Params}, - ?assertMatch( - { - ok, - {ok, #{result := {ok, [{affected_rows, 1}]}}} - }, - ?wait_async_action( - emqx_resource:query(ResourceId, Message), - #{?snk_kind := oracle_query}, - 5_000 - ) - ), - ok - end, - [] - ), - ok. - t_batch_sync_query(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), @@ -449,42 +405,6 @@ t_batch_sync_query(Config) -> ), ok. -t_batch_async_query(Config) -> - ResourceId = resource_id(Config), - ?check_trace( - begin - ?assertMatch({ok, _}, create_bridge_api(Config)), - ?retry( - _Sleep = 1_000, - _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) - ), - reset_table(Config), - MsgId = erlang:unique_integer(), - Params = #{ - topic => ?config(mqtt_topic, Config), - id => MsgId, - payload => ?config(oracle_name, Config), - retain => false - }, - Message = {send_message, Params}, - ?assertMatch( - { - ok, - {ok, #{result := {ok, [{affected_rows, 1}]}}} - }, - ?wait_async_action( - emqx_resource:query(ResourceId, Message), - #{?snk_kind := oracle_batch_query}, - 5_000 - ) - ), - ok - end, - [] - ), - ok. - t_create_via_http(Config) -> ?check_trace( begin diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index c39a6a6d7..a0d7169f3 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -23,8 +23,6 @@ on_stop/2, on_query/3, on_batch_query/3, - on_query_async/4, - on_batch_query_async/4, on_get_status/2 ]). @@ -35,7 +33,6 @@ -export([ query/3, execute_batch/3, - do_async_reply/2, do_get_status/1 ]). @@ -46,7 +43,6 @@ -define(ACTION_SEND_MESSAGE, send_message). -define(SYNC_QUERY_MODE, no_handover). --define(ASYNC_QUERY_MODE(REPLY), {handover_async, {?MODULE, do_async_reply, [REPLY]}}). -define(ORACLE_HOST_OPTIONS, #{ default_port => ?ORACLE_DEFAULT_PORT @@ -67,7 +63,10 @@ batch_params_tokens := params_tokens() }. -callback_mode() -> async_if_possible. +% As ecpool is not monitoring the worker's PID when doing a handover_async, the +% request can be lost if worker crashes. Thus, it's better to force requests to +% be sync for now. +callback_mode() -> always_sync. is_buffer_supported() -> false. @@ -147,24 +146,6 @@ on_query( Res = on_sql_query(InstId, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data), handle_result(Res). -on_query_async(InstId, {TypeOrKey, NameOrSQL}, Reply, State) -> - on_query_async(InstId, {TypeOrKey, NameOrSQL, []}, Reply, State); -on_query_async( - InstId, {TypeOrKey, NameOrSQL, Params} = Query, Reply, #{pool_name := PoolName} = State -) -> - ?SLOG(debug, #{ - msg => "oracle database connector received async sql query", - connector => InstId, - query => Query, - reply => Reply, - state => State - }), - ApplyMode = ?ASYNC_QUERY_MODE(Reply), - Type = query, - {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), - Res = on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL2, Data), - handle_result(Res). - on_batch_query( InstId, BatchReq, @@ -207,51 +188,6 @@ on_batch_query( {error, {unrecoverable_error, invalid_request}} end. -on_batch_query_async( - InstId, - BatchReq, - Reply, - #{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State -) -> - case BatchReq of - [{Key, _} = Request | _] -> - BinKey = to_bin(Key), - case maps:get(BinKey, Tokens, undefined) of - undefined -> - Log = #{ - connector => InstId, - first_request => Request, - state => State, - msg => "batch prepare not implemented" - }, - ?SLOG(error, Log), - {error, {unrecoverable_error, batch_prepare_not_implemented}}; - TokenList -> - {_, Datas} = lists:unzip(BatchReq), - Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], - St = maps:get(BinKey, Sts), - case - on_sql_query( - InstId, PoolName, execute_batch, ?ASYNC_QUERY_MODE(Reply), St, Datas2 - ) - of - {ok, Results} -> - handle_batch_result(Results, 0); - Result -> - Result - end - end; - _ -> - Log = #{ - connector => InstId, - request => BatchReq, - state => State, - msg => "invalid request" - }, - ?SLOG(error, Log), - {error, {unrecoverable_error, invalid_request}} - end. - proc_sql_params(query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; proc_sql_params(TypeOrKey, SQLOrData, Params, #{ @@ -429,6 +365,3 @@ handle_batch_result([{proc_result, RetCode, Reason} | _Rest], _Acc) -> {error, {unrecoverable_error, {RetCode, Reason}}}; handle_batch_result([], Acc) -> {ok, Acc}. - -do_async_reply(Result, {ReplyFun, [Context]}) -> - ReplyFun(Context, Result).