refactor: rename instance_id, which is also PoolName

This commit is contained in:
JimMoen 2023-04-14 01:39:08 +08:00
parent 2d1b94f7f7
commit 65c4234829
No known key found for this signature in database
GPG Key ID: 87A520B4F76BA86D
1 changed files with 25 additions and 26 deletions

View File

@ -74,7 +74,6 @@
%% Copied from odbc reference page %% Copied from odbc reference page
%% https://www.erlang.org/doc/man/odbc.html %% https://www.erlang.org/doc/man/odbc.html
-type bridge_id() :: binary().
%% as returned by connect/2 %% as returned by connect/2
-type connection_reference() :: pid(). -type connection_reference() :: pid().
-type time_out() :: milliseconds() | infinity. -type time_out() :: milliseconds() | infinity.
@ -175,7 +174,7 @@ callback_mode() -> async_if_possible.
is_buffer_supported() -> false. is_buffer_supported() -> false.
on_start( on_start(
BridgeId = PoolName, InstanceId = PoolName,
#{ #{
server := Server, server := Server,
username := Username, username := Username,
@ -188,7 +187,7 @@ on_start(
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_sqlserver_connector", msg => "starting_sqlserver_connector",
connector => BridgeId, connector => InstanceId,
config => emqx_misc:redact(Config) config => emqx_misc:redact(Config)
}), }),
@ -214,7 +213,7 @@ on_start(
], ],
State = #{ State = #{
%% also BridgeId %% also InstanceId
poolname => PoolName, poolname => PoolName,
sql_templates => parse_sql_template(Config), sql_templates => parse_sql_template(Config),
resource_opts => ResourceOpts resource_opts => ResourceOpts
@ -238,7 +237,7 @@ on_stop(InstanceId, #{poolname := PoolName} = _State) ->
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
-spec on_query( -spec on_query(
bridge_id(), manager_id(),
{?ACTION_SEND_MESSAGE, map()}, {?ACTION_SEND_MESSAGE, map()},
state() state()
) -> ) ->
@ -246,16 +245,16 @@ on_stop(InstanceId, #{poolname := PoolName} = _State) ->
| {ok, list()} | {ok, list()}
| {error, {recoverable_error, term()}} | {error, {recoverable_error, term()}}
| {error, term()}. | {error, term()}.
on_query(BridgeId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
?TRACE( ?TRACE(
"SINGLE_QUERY_SYNC", "SINGLE_QUERY_SYNC",
"bridge_sqlserver_received", "bridge_sqlserver_received",
#{requests => Query, connector => BridgeId, state => State} #{requests => Query, connector => InstanceId, state => State}
), ),
do_query(BridgeId, Query, ?SYNC_QUERY_MODE, State). do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State).
-spec on_query_async( -spec on_query_async(
bridge_id(), manager_id(),
{?ACTION_SEND_MESSAGE, map()}, {?ACTION_SEND_MESSAGE, map()},
{ReplyFun :: function(), Args :: list()}, {ReplyFun :: function(), Args :: list()},
state() state()
@ -263,7 +262,7 @@ on_query(BridgeId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
{ok, any()} {ok, any()}
| {error, term()}. | {error, term()}.
on_query_async( on_query_async(
BridgeId, InstanceId,
{?ACTION_SEND_MESSAGE, _Msg} = Query, {?ACTION_SEND_MESSAGE, _Msg} = Query,
ReplyFunAndArgs, ReplyFunAndArgs,
%% #{poolname := PoolName, sql_templates := Templates} = State %% #{poolname := PoolName, sql_templates := Templates} = State
@ -272,12 +271,12 @@ on_query_async(
?TRACE( ?TRACE(
"SINGLE_QUERY_ASYNC", "SINGLE_QUERY_ASYNC",
"bridge_sqlserver_received", "bridge_sqlserver_received",
#{requests => Query, connector => BridgeId, state => State} #{requests => Query, connector => InstanceId, state => State}
), ),
do_query(BridgeId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). do_query(InstanceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
-spec on_batch_query( -spec on_batch_query(
bridge_id(), manager_id(),
[{?ACTION_SEND_MESSAGE, map()}], [{?ACTION_SEND_MESSAGE, map()}],
state() state()
) -> ) ->
@ -285,27 +284,27 @@ on_query_async(
| {ok, list()} | {ok, list()}
| {error, {recoverable_error, term()}} | {error, {recoverable_error, term()}}
| {error, term()}. | {error, term()}.
on_batch_query(BridgeId, BatchRequests, State) -> on_batch_query(InstanceId, BatchRequests, State) ->
?TRACE( ?TRACE(
"BATCH_QUERY_SYNC", "BATCH_QUERY_SYNC",
"bridge_sqlserver_received", "bridge_sqlserver_received",
#{requests => BatchRequests, connector => BridgeId, state => State} #{requests => BatchRequests, connector => InstanceId, state => State}
), ),
do_query(BridgeId, BatchRequests, ?SYNC_QUERY_MODE, State). do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State).
-spec on_batch_query_async( -spec on_batch_query_async(
bridge_id(), manager_id(),
[{?ACTION_SEND_MESSAGE, map()}], [{?ACTION_SEND_MESSAGE, map()}],
{ReplyFun :: function(), Args :: list()}, {ReplyFun :: function(), Args :: list()},
state() state()
) -> {ok, any()}. ) -> {ok, any()}.
on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) -> on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) ->
?TRACE( ?TRACE(
"BATCH_QUERY_ASYNC", "BATCH_QUERY_ASYNC",
"bridge_sqlserver_received", "bridge_sqlserver_received",
#{requests => Requests, connector => BridgeId, state => State} #{requests => Requests, connector => InstanceId, state => State}
), ),
do_query(BridgeId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
on_get_status(_InstanceId, #{poolname := Pool, resource_opts := ResourceOpts} = _State) -> on_get_status(_InstanceId, #{poolname := Pool, resource_opts := ResourceOpts} = _State) ->
RequestTimeout = ?REQUEST_TIMEOUT(ResourceOpts), RequestTimeout = ?REQUEST_TIMEOUT(ResourceOpts),
@ -369,7 +368,7 @@ conn_str([{_, _} | Opts], Acc) ->
%% Sync & Async query with singe & batch sql statement %% Sync & Async query with singe & batch sql statement
-spec do_query( -spec do_query(
bridge_id(), manager_id(),
Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}], Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}],
ApplyMode :: ApplyMode ::
handover handover
@ -380,7 +379,7 @@ conn_str([{_, _} | Opts], Acc) ->
| {error, {recoverable_error, term()}} | {error, {recoverable_error, term()}}
| {error, term()}. | {error, term()}.
do_query( do_query(
BridgeId, InstanceId,
Query, Query,
ApplyMode, ApplyMode,
#{poolname := PoolName, sql_templates := Templates} = State #{poolname := PoolName, sql_templates := Templates} = State
@ -388,7 +387,7 @@ do_query(
?TRACE( ?TRACE(
"SINGLE_QUERY_SYNC", "SINGLE_QUERY_SYNC",
"sqlserver_connector_received", "sqlserver_connector_received",
#{query => Query, connector => BridgeId, state => State} #{query => Query, connector => InstanceId, state => State}
), ),
%% only insert sql statement for single query and batch query %% only insert sql statement for single query and batch query
@ -412,7 +411,7 @@ do_query(
), ),
?SLOG(error, #{ ?SLOG(error, #{
msg => "sqlserver_connector_do_query_failed", msg => "sqlserver_connector_do_query_failed",
connector => BridgeId, connector => InstanceId,
query => Query, query => Query,
reason => Reason reason => Reason
}), }),
@ -426,9 +425,9 @@ do_query(
end. end.
worker_do_insert( worker_do_insert(
Conn, SQL, #{resource_opts := ResourceOpts, poolname := BridgeId} = State Conn, SQL, #{resource_opts := ResourceOpts, poolname := InstanceId} = State
) -> ) ->
LogMeta = #{connector => BridgeId, state => State}, LogMeta = #{connector => InstanceId, state => State},
try try
case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of
{selected, Rows, _} -> {selected, Rows, _} ->