From a8b000f06291a80e999b0c16b5e8adf0e0acaf7f Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 26 Apr 2023 20:44:56 +0200 Subject: [PATCH] refactor(sqlserver): support only sync mode at connector level --- .../src/emqx_ee_connector_sqlserver.erl | 52 ++----------------- 1 file changed, 4 insertions(+), 48 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl index 6cbd9de4e..97a46152d 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl @@ -34,8 +34,6 @@ on_stop/2, on_query/3, on_batch_query/3, - on_query_async/4, - on_batch_query_async/4, on_get_status/2 ]). @@ -43,7 +41,7 @@ -export([connect/1]). %% Internal exports used to execute code with ecpool worker --export([do_get_status/1, worker_do_insert/3, do_async_reply/2]). +-export([do_get_status/1, worker_do_insert/3]). -import(emqx_plugin_libs_rule, [str/1]). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -51,7 +49,6 @@ -define(ACTION_SEND_MESSAGE, send_message). -define(SYNC_QUERY_MODE, handover). --define(ASYNC_QUERY_MODE(REPLY), {handover_async, {?MODULE, do_async_reply, [REPLY]}}). -define(SQLSERVER_HOST_OPTIONS, #{ default_port => 1433 @@ -169,7 +166,7 @@ server() -> %% Callbacks defined in emqx_resource %%==================================================================== -callback_mode() -> async_if_possible. +callback_mode() -> always_sync. is_buffer_supported() -> false. @@ -253,28 +250,6 @@ on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> ), do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State). --spec on_query_async( - manager_id(), - {?ACTION_SEND_MESSAGE, map()}, - {ReplyFun :: function(), Args :: list()}, - state() -) -> - {ok, any()} - | {error, term()}. -on_query_async( - InstanceId, - {?ACTION_SEND_MESSAGE, _Msg} = Query, - ReplyFunAndArgs, - %% #{poolname := PoolName, sql_templates := Templates} = State - State -) -> - ?TRACE( - "SINGLE_QUERY_ASYNC", - "bridge_sqlserver_received", - #{requests => Query, connector => InstanceId, state => State} - ), - do_query(InstanceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). - -spec on_batch_query( manager_id(), [{?ACTION_SEND_MESSAGE, map()}], @@ -292,20 +267,6 @@ on_batch_query(InstanceId, BatchRequests, State) -> ), do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State). --spec on_batch_query_async( - manager_id(), - [{?ACTION_SEND_MESSAGE, map()}], - {ReplyFun :: function(), Args :: list()}, - state() -) -> {ok, any()}. -on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) -> - ?TRACE( - "BATCH_QUERY_ASYNC", - "bridge_sqlserver_received", - #{requests => Requests, connector => InstanceId, state => State} - ), - do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). - on_get_status(_InstanceId, #{poolname := Pool} = _State) -> Health = emqx_plugin_libs_pool:health_check_ecpool_workers( Pool, {?MODULE, do_get_status, []} @@ -365,13 +326,11 @@ conn_str([{password, Password} | Opts], Acc) -> conn_str([{_, _} | Opts], Acc) -> conn_str(Opts, Acc). -%% Sync & Async query with singe & batch sql statement +%% Query with singe & batch sql statement -spec do_query( manager_id(), Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}], - ApplyMode :: - handover - | {handover_async, {?MODULE, do_async_reply, [{ReplyFun :: function(), Args :: list()}]}}, + ApplyMode :: handover, state() ) -> {ok, list()} @@ -531,6 +490,3 @@ apply_template(Query, Templates) -> %% TODO: more detail infomatoin ?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}), {error, failed_to_apply_sql_template}. - -do_async_reply(Result, {ReplyFun, Args}) -> - erlang:apply(ReplyFun, Args ++ [Result]).