diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index 98f3e497d..be5e56e85 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -38,7 +38,6 @@ }. -type state() :: #{ connect_timeout := timer:time(), - instance_id := manager_id(), jwt_worker_id := jwt_worker(), max_retries := non_neg_integer(), payload_template := emqx_plugin_libs_rule:tmpl_token(), @@ -61,9 +60,9 @@ is_buffer_supported() -> false. callback_mode() -> async_if_possible. --spec on_start(manager_id(), config()) -> {ok, state()} | {error, term()}. +-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. on_start( - InstanceId, + ResourceId, #{ connect_timeout := ConnectTimeout, max_retries := MaxRetries, @@ -75,7 +74,7 @@ on_start( ) -> ?SLOG(info, #{ msg => "starting_gcp_pubsub_bridge", - connector => InstanceId, + connector => ResourceId, config => Config }), %% emulating the emulator behavior @@ -100,14 +99,13 @@ on_start( #{ jwt_worker_id := JWTWorkerId, project_id := ProjectId - } = ensure_jwt_worker(InstanceId, Config), + } = ensure_jwt_worker(ResourceId, Config), State = #{ connect_timeout => ConnectTimeout, - instance_id => InstanceId, jwt_worker_id => JWTWorkerId, max_retries => MaxRetries, payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate), - pool_name => InstanceId, + pool_name => ResourceId, project_id => ProjectId, pubsub_topic => PubSubTopic, request_timeout => RequestTimeout @@ -115,39 +113,39 @@ on_start( ?tp( gcp_pubsub_on_start_before_starting_pool, #{ - instance_id => InstanceId, - pool_name => InstanceId, + resource_id => ResourceId, + pool_name => ResourceId, pool_opts => PoolOpts } ), - ?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => InstanceId}), - case ehttpc_sup:start_pool(InstanceId, PoolOpts) of + ?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => ResourceId}), + case ehttpc_sup:start_pool(ResourceId, PoolOpts) of {ok, _} -> {ok, State}; {error, {already_started, _}} -> - ?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => InstanceId}), + ?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => ResourceId}), {ok, State}; {error, Reason} -> ?tp(gcp_pubsub_ehttpc_pool_start_failure, #{ - pool_name => InstanceId, + pool_name => ResourceId, reason => Reason }), {error, Reason} end. --spec on_stop(manager_id(), state()) -> ok | {error, term()}. +-spec on_stop(resource_id(), state()) -> ok | {error, term()}. on_stop( - InstanceId, - _State = #{jwt_worker_id := JWTWorkerId, pool_name := PoolName} + ResourceId, + _State = #{jwt_worker_id := JWTWorkerId} ) -> - ?tp(gcp_pubsub_stop, #{instance_id => InstanceId, jwt_worker_id => JWTWorkerId}), + ?tp(gcp_pubsub_stop, #{resource_id => ResourceId, jwt_worker_id => JWTWorkerId}), ?SLOG(info, #{ msg => "stopping_gcp_pubsub_bridge", - connector => InstanceId + connector => ResourceId }), emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), - emqx_connector_jwt:delete_jwt(?JWT_TABLE, InstanceId), - ehttpc_sup:stop_pool(PoolName). + emqx_connector_jwt:delete_jwt(?JWT_TABLE, ResourceId), + ehttpc_sup:stop_pool(ResourceId). -spec on_query( resource_id(), @@ -213,9 +211,9 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> ), do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId). --spec on_get_status(manager_id(), state()) -> connected | disconnected. -on_get_status(InstanceId, #{connect_timeout := Timeout, pool_name := PoolName} = State) -> - case do_get_status(InstanceId, PoolName, Timeout) of +-spec on_get_status(resource_id(), state()) -> connected | disconnected. +on_get_status(ResourceId, #{connect_timeout := Timeout} = State) -> + case do_get_status(ResourceId, Timeout) of true -> connected; false -> @@ -230,12 +228,12 @@ on_get_status(InstanceId, #{connect_timeout := Timeout, pool_name := PoolName} = %% Helper fns %%------------------------------------------------------------------------------------------------- --spec ensure_jwt_worker(manager_id(), config()) -> +-spec ensure_jwt_worker(resource_id(), config()) -> #{ jwt_worker_id := jwt_worker(), project_id := binary() }. -ensure_jwt_worker(InstanceId, #{ +ensure_jwt_worker(ResourceId, #{ service_account_json := ServiceAccountJSON }) -> #{ @@ -250,7 +248,7 @@ ensure_jwt_worker(InstanceId, #{ Alg = <<"RS256">>, Config = #{ private_key => PrivateKeyPEM, - resource_id => InstanceId, + resource_id => ResourceId, expiration => ExpirationMS, table => ?JWT_TABLE, iss => ServiceAccountEmail, @@ -260,14 +258,14 @@ ensure_jwt_worker(InstanceId, #{ alg => Alg }, - JWTWorkerId = <<"gcp_pubsub_jwt_worker:", InstanceId/binary>>, + JWTWorkerId = <<"gcp_pubsub_jwt_worker:", ResourceId/binary>>, Worker = case emqx_connector_jwt_sup:ensure_worker_present(JWTWorkerId, Config) of {ok, Worker0} -> Worker0; Error -> ?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{ - connector => InstanceId, + connector => ResourceId, reason => Error }), _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), @@ -281,18 +279,18 @@ ensure_jwt_worker(InstanceId, #{ %% produced by the worker. receive {Ref, token_created} -> - ?tp(gcp_pubsub_bridge_jwt_created, #{resource_id => InstanceId}), + ?tp(gcp_pubsub_bridge_jwt_created, #{resource_id => ResourceId}), demonitor(MRef, [flush]), ok; {'DOWN', MRef, process, Worker, Reason} -> ?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{ - connector => InstanceId, + connector => ResourceId, reason => Reason }), _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), throw(failed_to_start_jwt_worker) after 10_000 -> - ?tp(warning, "gcp_pubsub_bridge_jwt_timeout", #{connector => InstanceId}), + ?tp(warning, "gcp_pubsub_bridge_jwt_timeout", #{connector => ResourceId}), demonitor(MRef, [flush]), _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), throw(timeout_creating_jwt) @@ -325,8 +323,8 @@ publish_path( <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>. -spec get_jwt_authorization_header(resource_id()) -> [{binary(), binary()}]. -get_jwt_authorization_header(InstanceId) -> - case emqx_connector_jwt:lookup_jwt(?JWT_TABLE, InstanceId) of +get_jwt_authorization_header(ResourceId) -> + case emqx_connector_jwt:lookup_jwt(?JWT_TABLE, ResourceId) of %% Since we synchronize the JWT creation during resource start %% (see `on_start/2'), this will be always be populated. {ok, JWT} -> @@ -345,7 +343,6 @@ get_jwt_authorization_header(InstanceId) -> do_send_requests_sync(State, Requests, ResourceId) -> #{ pool_name := PoolName, - instance_id := InstanceId, max_retries := MaxRetries, request_timeout := RequestTimeout } = State, @@ -353,12 +350,11 @@ do_send_requests_sync(State, Requests, ResourceId) -> gcp_pubsub_bridge_do_send_requests, #{ query_mode => sync, - instance_id => InstanceId, resource_id => ResourceId, requests => Requests } ), - Headers = get_jwt_authorization_header(InstanceId), + Headers = get_jwt_authorization_header(ResourceId), Payloads = lists:map( fun({send_message, Selected}) -> @@ -471,19 +467,17 @@ do_send_requests_sync(State, Requests, ResourceId) -> do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> #{ pool_name := PoolName, - instance_id := InstanceId, request_timeout := RequestTimeout } = State, ?tp( gcp_pubsub_bridge_do_send_requests, #{ query_mode => async, - instance_id => InstanceId, resource_id => ResourceId, requests => Requests } ), - Headers = get_jwt_authorization_header(InstanceId), + Headers = get_jwt_authorization_header(ResourceId), Payloads = lists:map( fun({send_message, Selected}) -> @@ -541,9 +535,9 @@ reply_delegator(_ResourceId, ReplyFunAndArgs, Result) -> emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) end. --spec do_get_status(manager_id(), binary(), timer:time()) -> boolean(). -do_get_status(InstanceId, PoolName, Timeout) -> - Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)], +-spec do_get_status(resource_id(), timer:time()) -> boolean(). +do_get_status(ResourceId, Timeout) -> + Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(ResourceId)], DoPerWorker = fun(Worker) -> case ehttpc:health_check(Worker, Timeout) of @@ -552,7 +546,7 @@ do_get_status(InstanceId, PoolName, Timeout) -> {error, Reason} -> ?SLOG(error, #{ msg => "ehttpc_health_check_failed", - instance_id => InstanceId, + connector => ResourceId, reason => Reason, worker => Worker }), diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c549b3467..f7958af81 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -114,8 +114,8 @@ callback_mode() -> is_buffer_supported() -> true. --spec on_start(manager_id(), config()) -> {ok, state()}. -on_start(InstanceId, Config) -> +-spec on_start(resource_id(), config()) -> {ok, state()}. +on_start(ResourceId, Config) -> #{ authentication := Auth, bootstrap_hosts := BootstrapHosts0, @@ -133,7 +133,7 @@ on_start(InstanceId, Config) -> BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0), KafkaType = kafka_consumer, %% Note: this is distinct per node. - ClientID = make_client_id(InstanceId, KafkaType, BridgeName), + ClientID = make_client_id(ResourceId, KafkaType, BridgeName), ClientOpts0 = case Auth of none -> []; @@ -144,26 +144,26 @@ on_start(InstanceId, Config) -> ok -> ?tp( kafka_consumer_client_started, - #{client_id => ClientID, instance_id => InstanceId} + #{client_id => ClientID, resource_id => ResourceId} ), ?SLOG(info, #{ msg => "kafka_consumer_client_started", - instance_id => InstanceId, + resource_id => ResourceId, kafka_hosts => BootstrapHosts }); {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_start_kafka_consumer_client", - instance_id => InstanceId, + resource_id => ResourceId, kafka_hosts => BootstrapHosts, reason => emqx_utils:redact(Reason) }), throw(?CLIENT_DOWN_MESSAGE) end, - start_consumer(Config, InstanceId, ClientID). + start_consumer(Config, ResourceId, ClientID). --spec on_stop(manager_id(), state()) -> ok. -on_stop(_InstanceID, State) -> +-spec on_stop(resource_id(), state()) -> ok. +on_stop(_ResourceID, State) -> #{ subscriber_id := SubscriberId, kafka_client_id := ClientID @@ -172,8 +172,8 @@ on_stop(_InstanceID, State) -> stop_client(ClientID), ok. --spec on_get_status(manager_id(), state()) -> connected | disconnected. -on_get_status(_InstanceID, State) -> +-spec on_get_status(resource_id(), state()) -> connected | disconnected. +on_get_status(_ResourceID, State) -> #{ subscriber_id := SubscriberId, kafka_client_id := ClientID, @@ -271,8 +271,8 @@ ensure_consumer_supervisor_started() -> ok end. --spec start_consumer(config(), manager_id(), brod:client_id()) -> {ok, state()}. -start_consumer(Config, InstanceId, ClientID) -> +-spec start_consumer(config(), resource_id(), brod:client_id()) -> {ok, state()}. +start_consumer(Config, ResourceId, ClientID) -> #{ bootstrap_hosts := BootstrapHosts0, bridge_name := BridgeName, @@ -292,7 +292,7 @@ start_consumer(Config, InstanceId, ClientID) -> InitialState = #{ key_encoding_mode => KeyEncodingMode, hookpoint => Hookpoint, - resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName), + resource_id => ResourceId, topic_mapping => TopicMapping, value_encoding_mode => ValueEncodingMode }, @@ -337,7 +337,7 @@ start_consumer(Config, InstanceId, ClientID) -> {ok, _ConsumerPid} -> ?tp( kafka_consumer_subscriber_started, - #{instance_id => InstanceId, subscriber_id => SubscriberId} + #{resource_id => ResourceId, subscriber_id => SubscriberId} ), {ok, #{ subscriber_id => SubscriberId, @@ -347,7 +347,7 @@ start_consumer(Config, InstanceId, ClientID) -> {error, Reason2} -> ?SLOG(error, #{ msg => "failed_to_start_kafka_consumer", - instance_id => InstanceId, + resource_id => ResourceId, kafka_hosts => emqx_bridge_kafka_impl:hosts(BootstrapHosts0), reason => emqx_utils:redact(Reason2) }), @@ -471,19 +471,19 @@ consumer_group_id(BridgeName0) -> BridgeName = to_bin(BridgeName0), <<"emqx-kafka-consumer-", BridgeName/binary>>. --spec is_dry_run(manager_id()) -> boolean(). -is_dry_run(InstanceId) -> - TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), +-spec is_dry_run(resource_id()) -> boolean(). +is_dry_run(ResourceId) -> + TestIdStart = string:find(ResourceId, ?TEST_ID_PREFIX), case TestIdStart of nomatch -> false; _ -> - string:equal(TestIdStart, InstanceId) + string:equal(TestIdStart, ResourceId) end. --spec make_client_id(manager_id(), kafka_consumer, atom() | binary()) -> atom(). -make_client_id(InstanceId, KafkaType, KafkaName) -> - case is_dry_run(InstanceId) of +-spec make_client_id(resource_id(), kafka_consumer, atom() | binary()) -> atom(). +make_client_id(ResourceId, KafkaType, KafkaName) -> + case is_dry_run(ResourceId) of false -> ClientID0 = emqx_bridge_kafka_impl:make_client_id(KafkaType, KafkaName), binary_to_atom(ClientID0); diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 27d50f077..300fe9b2d 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -70,7 +70,7 @@ callback_mode() -> async_if_possible. %% workers. is_buffer_supported() -> true. --spec on_start(manager_id(), config()) -> {ok, state()}. +-spec on_start(resource_id(), config()) -> {ok, state()}. on_start(InstanceId, Config) -> #{ authentication := _Auth, @@ -106,7 +106,7 @@ on_start(InstanceId, Config) -> end, start_producer(Config, InstanceId, ClientId, ClientOpts). --spec on_stop(manager_id(), state()) -> ok. +-spec on_stop(resource_id(), state()) -> ok. on_stop(_InstanceId, State) -> #{ pulsar_client_id := ClientId, @@ -117,7 +117,7 @@ on_stop(_InstanceId, State) -> ?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}), ok. --spec on_get_status(manager_id(), state()) -> connected | disconnected. +-spec on_get_status(resource_id(), state()) -> connected | disconnected. on_get_status(_InstanceId, State = #{}) -> #{ pulsar_client_id := ClientId, @@ -144,7 +144,7 @@ on_get_status(_InstanceId, _State) -> %% create the bridge is not quite finished, `State = undefined'. connecting. --spec on_query(manager_id(), {send_message, map()}, state()) -> +-spec on_query(resource_id(), {send_message, map()}, state()) -> {ok, term()} | {error, timeout} | {error, term()}. @@ -163,7 +163,7 @@ on_query(_InstanceId, {send_message, Message}, State) -> end. -spec on_query_async( - manager_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state() + resource_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state() ) -> {ok, pid()}. on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) -> @@ -203,7 +203,7 @@ format_servers(Servers0) -> Servers1 ). --spec make_client_id(manager_id(), atom() | binary()) -> pulsar_client_id(). +-spec make_client_id(resource_id(), atom() | binary()) -> pulsar_client_id(). make_client_id(InstanceId, BridgeName) -> case is_dry_run(InstanceId) of true -> @@ -218,7 +218,7 @@ make_client_id(InstanceId, BridgeName) -> binary_to_atom(ClientIdBin) end. --spec is_dry_run(manager_id()) -> boolean(). +-spec is_dry_run(resource_id()) -> boolean(). is_dry_run(InstanceId) -> TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), case TestIdStart of @@ -255,7 +255,7 @@ producer_name(ClientId) -> ]) ). --spec start_producer(config(), manager_id(), pulsar_client_id(), map()) -> {ok, state()}. +-spec start_producer(config(), resource_id(), pulsar_client_id(), map()) -> {ok, state()}. start_producer(Config, InstanceId, ClientId, ClientOpts) -> #{ conn_opts := ConnOpts, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 91572eac3..e6f86fb59 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- -type resource_type() :: module(). -type resource_id() :: binary(). --type manager_id() :: binary(). -type raw_resource_config() :: binary() | raw_term_resource_config(). -type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()]. -type resource_config() :: term(). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 6f72f8a16..7c48e8ee4 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -367,9 +367,9 @@ is_buffer_supported(Module) -> -spec call_start(resource_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. -call_start(MgrId, Mod, Config) -> +call_start(ResId, Mod, Config) -> try - Mod:on_start(MgrId, Config) + Mod:on_start(ResId, Config) catch throw:Error -> {error, Error}; @@ -382,12 +382,12 @@ call_start(MgrId, Mod, Config) -> | {resource_status(), resource_state()} | {resource_status(), resource_state(), term()} | {error, term()}. -call_health_check(MgrId, Mod, ResourceState) -> - ?SAFE_CALL(Mod:on_get_status(MgrId, ResourceState)). +call_health_check(ResId, Mod, ResourceState) -> + ?SAFE_CALL(Mod:on_get_status(ResId, ResourceState)). -spec call_stop(resource_id(), module(), resource_state()) -> term(). -call_stop(MgrId, Mod, ResourceState) -> - ?SAFE_CALL(Mod:on_stop(MgrId, ResourceState)). +call_stop(ResId, Mod, ResourceState) -> + ?SAFE_CALL(Mod:on_stop(ResId, ResourceState)). -spec check_config(resource_type(), raw_resource_config()) -> {ok, resource_config()} | {error, term()}. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl index 90d90cb36..61e7b24c8 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl @@ -174,7 +174,7 @@ callback_mode() -> async_if_possible. is_buffer_supported() -> false. on_start( - InstanceId = PoolName, + ResourceId = PoolName, #{ server := Server, username := Username, @@ -187,7 +187,7 @@ on_start( ) -> ?SLOG(info, #{ msg => "starting_sqlserver_connector", - connector => InstanceId, + connector => ResourceId, config => emqx_utils:redact(Config) }), @@ -212,7 +212,7 @@ on_start( ], State = #{ - %% also InstanceId + %% also ResourceId pool_name => PoolName, sql_templates => parse_sql_template(Config), resource_opts => ResourceOpts @@ -228,15 +228,15 @@ on_start( {error, Reason} end. -on_stop(InstanceId, #{pool_name := PoolName} = _State) -> +on_stop(ResourceId, _State) -> ?SLOG(info, #{ msg => "stopping_sqlserver_connector", - connector => InstanceId + connector => ResourceId }), - emqx_resource_pool:stop(PoolName). + emqx_resource_pool:stop(ResourceId). -spec on_query( - manager_id(), + resource_id(), {?ACTION_SEND_MESSAGE, map()}, state() ) -> @@ -244,16 +244,16 @@ on_stop(InstanceId, #{pool_name := PoolName} = _State) -> | {ok, list()} | {error, {recoverable_error, term()}} | {error, term()}. -on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> +on_query(ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> ?TRACE( "SINGLE_QUERY_SYNC", "bridge_sqlserver_received", - #{requests => Query, connector => InstanceId, state => State} + #{requests => Query, connector => ResourceId, state => State} ), - do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State). + do_query(ResourceId, Query, ?SYNC_QUERY_MODE, State). -spec on_query_async( - manager_id(), + resource_id(), {?ACTION_SEND_MESSAGE, map()}, {ReplyFun :: function(), Args :: list()}, state() @@ -261,7 +261,7 @@ on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> {ok, any()} | {error, term()}. on_query_async( - InstanceId, + ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, ReplyFunAndArgs, State @@ -269,12 +269,12 @@ on_query_async( ?TRACE( "SINGLE_QUERY_ASYNC", "bridge_sqlserver_received", - #{requests => Query, connector => InstanceId, state => State} + #{requests => Query, connector => ResourceId, state => State} ), - do_query(InstanceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). + do_query(ResourceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). -spec on_batch_query( - manager_id(), + resource_id(), [{?ACTION_SEND_MESSAGE, map()}], state() ) -> @@ -282,29 +282,29 @@ on_query_async( | {ok, list()} | {error, {recoverable_error, term()}} | {error, term()}. -on_batch_query(InstanceId, BatchRequests, State) -> +on_batch_query(ResourceId, BatchRequests, State) -> ?TRACE( "BATCH_QUERY_SYNC", "bridge_sqlserver_received", - #{requests => BatchRequests, connector => InstanceId, state => State} + #{requests => BatchRequests, connector => ResourceId, state => State} ), - do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State). + do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State). -spec on_batch_query_async( - manager_id(), + resource_id(), [{?ACTION_SEND_MESSAGE, map()}], {ReplyFun :: function(), Args :: list()}, state() ) -> {ok, any()}. -on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) -> +on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> ?TRACE( "BATCH_QUERY_ASYNC", "bridge_sqlserver_received", - #{requests => Requests, connector => InstanceId, state => State} + #{requests => Requests, connector => ResourceId, state => State} ), - do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). + do_query(ResourceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). -on_get_status(_InstanceId, #{pool_name := PoolName} = _State) -> +on_get_status(_ResourceId, #{pool_name := PoolName} = _State) -> Health = emqx_resource_pool:health_check_workers( PoolName, {?MODULE, do_get_status, []} @@ -366,7 +366,7 @@ conn_str([{_, _} | Opts], Acc) -> %% Sync & Async query with singe & batch sql statement -spec do_query( - manager_id(), + resource_id(), Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}], ApplyMode :: handover @@ -377,7 +377,7 @@ conn_str([{_, _} | Opts], Acc) -> | {error, {recoverable_error, term()}} | {error, term()}. do_query( - InstanceId, + ResourceId, Query, ApplyMode, #{pool_name := PoolName, sql_templates := Templates} = State @@ -385,7 +385,7 @@ do_query( ?TRACE( "SINGLE_QUERY_SYNC", "sqlserver_connector_received", - #{query => Query, connector => InstanceId, state => State} + #{query => Query, connector => ResourceId, state => State} ), %% only insert sql statement for single query and batch query @@ -409,7 +409,7 @@ do_query( ), ?SLOG(error, #{ msg => "sqlserver_connector_do_query_failed", - connector => InstanceId, + connector => ResourceId, query => Query, reason => Reason }), @@ -423,9 +423,9 @@ do_query( end. worker_do_insert( - Conn, SQL, #{resource_opts := ResourceOpts, pool_name := InstanceId} = State + Conn, SQL, #{resource_opts := ResourceOpts, pool_name := ResourceId} = State ) -> - LogMeta = #{connector => InstanceId, state => State}, + LogMeta = #{connector => ResourceId, state => State}, try case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of {selected, Rows, _} ->