refactor(sqlserver): support only sync mode at connector level

This commit is contained in:
Zaiming (Stone) Shi 2023-04-26 20:44:56 +02:00
parent 7967090de0
commit a8b000f062
1 changed files with 4 additions and 48 deletions

View File

@ -34,8 +34,6 @@
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
on_query_async/4,
on_batch_query_async/4,
on_get_status/2 on_get_status/2
]). ]).
@ -43,7 +41,7 @@
-export([connect/1]). -export([connect/1]).
%% Internal exports used to execute code with ecpool worker %% Internal exports used to execute code with ecpool worker
-export([do_get_status/1, worker_do_insert/3, do_async_reply/2]). -export([do_get_status/1, worker_do_insert/3]).
-import(emqx_plugin_libs_rule, [str/1]). -import(emqx_plugin_libs_rule, [str/1]).
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
@ -51,7 +49,6 @@
-define(ACTION_SEND_MESSAGE, send_message). -define(ACTION_SEND_MESSAGE, send_message).
-define(SYNC_QUERY_MODE, handover). -define(SYNC_QUERY_MODE, handover).
-define(ASYNC_QUERY_MODE(REPLY), {handover_async, {?MODULE, do_async_reply, [REPLY]}}).
-define(SQLSERVER_HOST_OPTIONS, #{ -define(SQLSERVER_HOST_OPTIONS, #{
default_port => 1433 default_port => 1433
@ -169,7 +166,7 @@ server() ->
%% Callbacks defined in emqx_resource %% Callbacks defined in emqx_resource
%%==================================================================== %%====================================================================
callback_mode() -> async_if_possible. callback_mode() -> always_sync.
is_buffer_supported() -> false. is_buffer_supported() -> false.
@ -253,28 +250,6 @@ on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
), ),
do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State). do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State).
-spec on_query_async(
manager_id(),
{?ACTION_SEND_MESSAGE, map()},
{ReplyFun :: function(), Args :: list()},
state()
) ->
{ok, any()}
| {error, term()}.
on_query_async(
InstanceId,
{?ACTION_SEND_MESSAGE, _Msg} = Query,
ReplyFunAndArgs,
%% #{poolname := PoolName, sql_templates := Templates} = State
State
) ->
?TRACE(
"SINGLE_QUERY_ASYNC",
"bridge_sqlserver_received",
#{requests => Query, connector => InstanceId, state => State}
),
do_query(InstanceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
-spec on_batch_query( -spec on_batch_query(
manager_id(), manager_id(),
[{?ACTION_SEND_MESSAGE, map()}], [{?ACTION_SEND_MESSAGE, map()}],
@ -292,20 +267,6 @@ on_batch_query(InstanceId, BatchRequests, State) ->
), ),
do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State). do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State).
-spec on_batch_query_async(
manager_id(),
[{?ACTION_SEND_MESSAGE, map()}],
{ReplyFun :: function(), Args :: list()},
state()
) -> {ok, any()}.
on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) ->
?TRACE(
"BATCH_QUERY_ASYNC",
"bridge_sqlserver_received",
#{requests => Requests, connector => InstanceId, state => State}
),
do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
on_get_status(_InstanceId, #{poolname := Pool} = _State) -> on_get_status(_InstanceId, #{poolname := Pool} = _State) ->
Health = emqx_plugin_libs_pool:health_check_ecpool_workers( Health = emqx_plugin_libs_pool:health_check_ecpool_workers(
Pool, {?MODULE, do_get_status, []} Pool, {?MODULE, do_get_status, []}
@ -365,13 +326,11 @@ conn_str([{password, Password} | Opts], Acc) ->
conn_str([{_, _} | Opts], Acc) -> conn_str([{_, _} | Opts], Acc) ->
conn_str(Opts, Acc). conn_str(Opts, Acc).
%% Sync & Async query with singe & batch sql statement %% Query with singe & batch sql statement
-spec do_query( -spec do_query(
manager_id(), manager_id(),
Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}], Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}],
ApplyMode :: ApplyMode :: handover,
handover
| {handover_async, {?MODULE, do_async_reply, [{ReplyFun :: function(), Args :: list()}]}},
state() state()
) -> ) ->
{ok, list()} {ok, list()}
@ -531,6 +490,3 @@ apply_template(Query, Templates) ->
%% TODO: more detail infomatoin %% TODO: more detail infomatoin
?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}), ?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}),
{error, failed_to_apply_sql_template}. {error, failed_to_apply_sql_template}.
do_async_reply(Result, {ReplyFun, Args}) ->
erlang:apply(ReplyFun, Args ++ [Result]).