From 65c4234829a9f52758803b4987a8275e8e90a24d Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 14 Apr 2023 01:39:08 +0800 Subject: [PATCH] refactor: rename instance_id, which is also PoolName --- .../src/emqx_ee_connector_sqlserver.erl | 51 +++++++++---------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl index e48570c96..586444947 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl @@ -74,7 +74,6 @@ %% Copied from odbc reference page %% https://www.erlang.org/doc/man/odbc.html --type bridge_id() :: binary(). %% as returned by connect/2 -type connection_reference() :: pid(). -type time_out() :: milliseconds() | infinity. @@ -175,7 +174,7 @@ callback_mode() -> async_if_possible. is_buffer_supported() -> false. on_start( - BridgeId = PoolName, + InstanceId = PoolName, #{ server := Server, username := Username, @@ -188,7 +187,7 @@ on_start( ) -> ?SLOG(info, #{ msg => "starting_sqlserver_connector", - connector => BridgeId, + connector => InstanceId, config => emqx_misc:redact(Config) }), @@ -214,7 +213,7 @@ on_start( ], State = #{ - %% also BridgeId + %% also InstanceId poolname => PoolName, sql_templates => parse_sql_template(Config), resource_opts => ResourceOpts @@ -238,7 +237,7 @@ on_stop(InstanceId, #{poolname := PoolName} = _State) -> emqx_plugin_libs_pool:stop_pool(PoolName). -spec on_query( - bridge_id(), + manager_id(), {?ACTION_SEND_MESSAGE, map()}, state() ) -> @@ -246,16 +245,16 @@ on_stop(InstanceId, #{poolname := PoolName} = _State) -> | {ok, list()} | {error, {recoverable_error, term()}} | {error, term()}. -on_query(BridgeId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> +on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> ?TRACE( "SINGLE_QUERY_SYNC", "bridge_sqlserver_received", - #{requests => Query, connector => BridgeId, state => State} + #{requests => Query, connector => InstanceId, state => State} ), - do_query(BridgeId, Query, ?SYNC_QUERY_MODE, State). + do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State). -spec on_query_async( - bridge_id(), + manager_id(), {?ACTION_SEND_MESSAGE, map()}, {ReplyFun :: function(), Args :: list()}, state() @@ -263,7 +262,7 @@ on_query(BridgeId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> {ok, any()} | {error, term()}. on_query_async( - BridgeId, + InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, ReplyFunAndArgs, %% #{poolname := PoolName, sql_templates := Templates} = State @@ -272,12 +271,12 @@ on_query_async( ?TRACE( "SINGLE_QUERY_ASYNC", "bridge_sqlserver_received", - #{requests => Query, connector => BridgeId, state => State} + #{requests => Query, connector => InstanceId, state => State} ), - do_query(BridgeId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). + do_query(InstanceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). -spec on_batch_query( - bridge_id(), + manager_id(), [{?ACTION_SEND_MESSAGE, map()}], state() ) -> @@ -285,27 +284,27 @@ on_query_async( | {ok, list()} | {error, {recoverable_error, term()}} | {error, term()}. -on_batch_query(BridgeId, BatchRequests, State) -> +on_batch_query(InstanceId, BatchRequests, State) -> ?TRACE( "BATCH_QUERY_SYNC", "bridge_sqlserver_received", - #{requests => BatchRequests, connector => BridgeId, state => State} + #{requests => BatchRequests, connector => InstanceId, state => State} ), - do_query(BridgeId, BatchRequests, ?SYNC_QUERY_MODE, State). + do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State). -spec on_batch_query_async( - bridge_id(), + manager_id(), [{?ACTION_SEND_MESSAGE, map()}], {ReplyFun :: function(), Args :: list()}, state() ) -> {ok, any()}. -on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) -> +on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) -> ?TRACE( "BATCH_QUERY_ASYNC", "bridge_sqlserver_received", - #{requests => Requests, connector => BridgeId, state => State} + #{requests => Requests, connector => InstanceId, state => State} ), - do_query(BridgeId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). + do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). on_get_status(_InstanceId, #{poolname := Pool, resource_opts := ResourceOpts} = _State) -> RequestTimeout = ?REQUEST_TIMEOUT(ResourceOpts), @@ -369,7 +368,7 @@ conn_str([{_, _} | Opts], Acc) -> %% Sync & Async query with singe & batch sql statement -spec do_query( - bridge_id(), + manager_id(), Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}], ApplyMode :: handover @@ -380,7 +379,7 @@ conn_str([{_, _} | Opts], Acc) -> | {error, {recoverable_error, term()}} | {error, term()}. do_query( - BridgeId, + InstanceId, Query, ApplyMode, #{poolname := PoolName, sql_templates := Templates} = State @@ -388,7 +387,7 @@ do_query( ?TRACE( "SINGLE_QUERY_SYNC", "sqlserver_connector_received", - #{query => Query, connector => BridgeId, state => State} + #{query => Query, connector => InstanceId, state => State} ), %% only insert sql statement for single query and batch query @@ -412,7 +411,7 @@ do_query( ), ?SLOG(error, #{ msg => "sqlserver_connector_do_query_failed", - connector => BridgeId, + connector => InstanceId, query => Query, reason => Reason }), @@ -426,9 +425,9 @@ do_query( end. worker_do_insert( - Conn, SQL, #{resource_opts := ResourceOpts, poolname := BridgeId} = State + Conn, SQL, #{resource_opts := ResourceOpts, poolname := InstanceId} = State ) -> - LogMeta = #{connector => BridgeId, state => State}, + LogMeta = #{connector => InstanceId, state => State}, try case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of {selected, Rows, _} ->