feat(resource): drop `manager_id()` type
This commit is contained in:
parent
aaef95b1da
commit
4575167607
|
@ -38,7 +38,6 @@
|
|||
}.
|
||||
-type state() :: #{
|
||||
connect_timeout := timer:time(),
|
||||
instance_id := manager_id(),
|
||||
jwt_worker_id := jwt_worker(),
|
||||
max_retries := non_neg_integer(),
|
||||
payload_template := emqx_plugin_libs_rule:tmpl_token(),
|
||||
|
@ -61,9 +60,9 @@ is_buffer_supported() -> false.
|
|||
|
||||
callback_mode() -> async_if_possible.
|
||||
|
||||
-spec on_start(manager_id(), config()) -> {ok, state()} | {error, term()}.
|
||||
-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
|
||||
on_start(
|
||||
InstanceId,
|
||||
ResourceId,
|
||||
#{
|
||||
connect_timeout := ConnectTimeout,
|
||||
max_retries := MaxRetries,
|
||||
|
@ -75,7 +74,7 @@ on_start(
|
|||
) ->
|
||||
?SLOG(info, #{
|
||||
msg => "starting_gcp_pubsub_bridge",
|
||||
connector => InstanceId,
|
||||
connector => ResourceId,
|
||||
config => Config
|
||||
}),
|
||||
%% emulating the emulator behavior
|
||||
|
@ -100,14 +99,13 @@ on_start(
|
|||
#{
|
||||
jwt_worker_id := JWTWorkerId,
|
||||
project_id := ProjectId
|
||||
} = ensure_jwt_worker(InstanceId, Config),
|
||||
} = ensure_jwt_worker(ResourceId, Config),
|
||||
State = #{
|
||||
connect_timeout => ConnectTimeout,
|
||||
instance_id => InstanceId,
|
||||
jwt_worker_id => JWTWorkerId,
|
||||
max_retries => MaxRetries,
|
||||
payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate),
|
||||
pool_name => InstanceId,
|
||||
pool_name => ResourceId,
|
||||
project_id => ProjectId,
|
||||
pubsub_topic => PubSubTopic,
|
||||
request_timeout => RequestTimeout
|
||||
|
@ -115,39 +113,39 @@ on_start(
|
|||
?tp(
|
||||
gcp_pubsub_on_start_before_starting_pool,
|
||||
#{
|
||||
instance_id => InstanceId,
|
||||
pool_name => InstanceId,
|
||||
resource_id => ResourceId,
|
||||
pool_name => ResourceId,
|
||||
pool_opts => PoolOpts
|
||||
}
|
||||
),
|
||||
?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => InstanceId}),
|
||||
case ehttpc_sup:start_pool(InstanceId, PoolOpts) of
|
||||
?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => ResourceId}),
|
||||
case ehttpc_sup:start_pool(ResourceId, PoolOpts) of
|
||||
{ok, _} ->
|
||||
{ok, State};
|
||||
{error, {already_started, _}} ->
|
||||
?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => InstanceId}),
|
||||
?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => ResourceId}),
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
?tp(gcp_pubsub_ehttpc_pool_start_failure, #{
|
||||
pool_name => InstanceId,
|
||||
pool_name => ResourceId,
|
||||
reason => Reason
|
||||
}),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec on_stop(manager_id(), state()) -> ok | {error, term()}.
|
||||
-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
|
||||
on_stop(
|
||||
InstanceId,
|
||||
_State = #{jwt_worker_id := JWTWorkerId, pool_name := PoolName}
|
||||
ResourceId,
|
||||
_State = #{jwt_worker_id := JWTWorkerId}
|
||||
) ->
|
||||
?tp(gcp_pubsub_stop, #{instance_id => InstanceId, jwt_worker_id => JWTWorkerId}),
|
||||
?tp(gcp_pubsub_stop, #{resource_id => ResourceId, jwt_worker_id => JWTWorkerId}),
|
||||
?SLOG(info, #{
|
||||
msg => "stopping_gcp_pubsub_bridge",
|
||||
connector => InstanceId
|
||||
connector => ResourceId
|
||||
}),
|
||||
emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
|
||||
emqx_connector_jwt:delete_jwt(?JWT_TABLE, InstanceId),
|
||||
ehttpc_sup:stop_pool(PoolName).
|
||||
emqx_connector_jwt:delete_jwt(?JWT_TABLE, ResourceId),
|
||||
ehttpc_sup:stop_pool(ResourceId).
|
||||
|
||||
-spec on_query(
|
||||
resource_id(),
|
||||
|
@ -213,9 +211,9 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
|
|||
),
|
||||
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId).
|
||||
|
||||
-spec on_get_status(manager_id(), state()) -> connected | disconnected.
|
||||
on_get_status(InstanceId, #{connect_timeout := Timeout, pool_name := PoolName} = State) ->
|
||||
case do_get_status(InstanceId, PoolName, Timeout) of
|
||||
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
|
||||
on_get_status(ResourceId, #{connect_timeout := Timeout} = State) ->
|
||||
case do_get_status(ResourceId, Timeout) of
|
||||
true ->
|
||||
connected;
|
||||
false ->
|
||||
|
@ -230,12 +228,12 @@ on_get_status(InstanceId, #{connect_timeout := Timeout, pool_name := PoolName} =
|
|||
%% Helper fns
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
-spec ensure_jwt_worker(manager_id(), config()) ->
|
||||
-spec ensure_jwt_worker(resource_id(), config()) ->
|
||||
#{
|
||||
jwt_worker_id := jwt_worker(),
|
||||
project_id := binary()
|
||||
}.
|
||||
ensure_jwt_worker(InstanceId, #{
|
||||
ensure_jwt_worker(ResourceId, #{
|
||||
service_account_json := ServiceAccountJSON
|
||||
}) ->
|
||||
#{
|
||||
|
@ -250,7 +248,7 @@ ensure_jwt_worker(InstanceId, #{
|
|||
Alg = <<"RS256">>,
|
||||
Config = #{
|
||||
private_key => PrivateKeyPEM,
|
||||
resource_id => InstanceId,
|
||||
resource_id => ResourceId,
|
||||
expiration => ExpirationMS,
|
||||
table => ?JWT_TABLE,
|
||||
iss => ServiceAccountEmail,
|
||||
|
@ -260,14 +258,14 @@ ensure_jwt_worker(InstanceId, #{
|
|||
alg => Alg
|
||||
},
|
||||
|
||||
JWTWorkerId = <<"gcp_pubsub_jwt_worker:", InstanceId/binary>>,
|
||||
JWTWorkerId = <<"gcp_pubsub_jwt_worker:", ResourceId/binary>>,
|
||||
Worker =
|
||||
case emqx_connector_jwt_sup:ensure_worker_present(JWTWorkerId, Config) of
|
||||
{ok, Worker0} ->
|
||||
Worker0;
|
||||
Error ->
|
||||
?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{
|
||||
connector => InstanceId,
|
||||
connector => ResourceId,
|
||||
reason => Error
|
||||
}),
|
||||
_ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
|
||||
|
@ -281,18 +279,18 @@ ensure_jwt_worker(InstanceId, #{
|
|||
%% produced by the worker.
|
||||
receive
|
||||
{Ref, token_created} ->
|
||||
?tp(gcp_pubsub_bridge_jwt_created, #{resource_id => InstanceId}),
|
||||
?tp(gcp_pubsub_bridge_jwt_created, #{resource_id => ResourceId}),
|
||||
demonitor(MRef, [flush]),
|
||||
ok;
|
||||
{'DOWN', MRef, process, Worker, Reason} ->
|
||||
?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{
|
||||
connector => InstanceId,
|
||||
connector => ResourceId,
|
||||
reason => Reason
|
||||
}),
|
||||
_ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
|
||||
throw(failed_to_start_jwt_worker)
|
||||
after 10_000 ->
|
||||
?tp(warning, "gcp_pubsub_bridge_jwt_timeout", #{connector => InstanceId}),
|
||||
?tp(warning, "gcp_pubsub_bridge_jwt_timeout", #{connector => ResourceId}),
|
||||
demonitor(MRef, [flush]),
|
||||
_ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
|
||||
throw(timeout_creating_jwt)
|
||||
|
@ -325,8 +323,8 @@ publish_path(
|
|||
<<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>.
|
||||
|
||||
-spec get_jwt_authorization_header(resource_id()) -> [{binary(), binary()}].
|
||||
get_jwt_authorization_header(InstanceId) ->
|
||||
case emqx_connector_jwt:lookup_jwt(?JWT_TABLE, InstanceId) of
|
||||
get_jwt_authorization_header(ResourceId) ->
|
||||
case emqx_connector_jwt:lookup_jwt(?JWT_TABLE, ResourceId) of
|
||||
%% Since we synchronize the JWT creation during resource start
|
||||
%% (see `on_start/2'), this will be always be populated.
|
||||
{ok, JWT} ->
|
||||
|
@ -345,7 +343,6 @@ get_jwt_authorization_header(InstanceId) ->
|
|||
do_send_requests_sync(State, Requests, ResourceId) ->
|
||||
#{
|
||||
pool_name := PoolName,
|
||||
instance_id := InstanceId,
|
||||
max_retries := MaxRetries,
|
||||
request_timeout := RequestTimeout
|
||||
} = State,
|
||||
|
@ -353,12 +350,11 @@ do_send_requests_sync(State, Requests, ResourceId) ->
|
|||
gcp_pubsub_bridge_do_send_requests,
|
||||
#{
|
||||
query_mode => sync,
|
||||
instance_id => InstanceId,
|
||||
resource_id => ResourceId,
|
||||
requests => Requests
|
||||
}
|
||||
),
|
||||
Headers = get_jwt_authorization_header(InstanceId),
|
||||
Headers = get_jwt_authorization_header(ResourceId),
|
||||
Payloads =
|
||||
lists:map(
|
||||
fun({send_message, Selected}) ->
|
||||
|
@ -471,19 +467,17 @@ do_send_requests_sync(State, Requests, ResourceId) ->
|
|||
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
|
||||
#{
|
||||
pool_name := PoolName,
|
||||
instance_id := InstanceId,
|
||||
request_timeout := RequestTimeout
|
||||
} = State,
|
||||
?tp(
|
||||
gcp_pubsub_bridge_do_send_requests,
|
||||
#{
|
||||
query_mode => async,
|
||||
instance_id => InstanceId,
|
||||
resource_id => ResourceId,
|
||||
requests => Requests
|
||||
}
|
||||
),
|
||||
Headers = get_jwt_authorization_header(InstanceId),
|
||||
Headers = get_jwt_authorization_header(ResourceId),
|
||||
Payloads =
|
||||
lists:map(
|
||||
fun({send_message, Selected}) ->
|
||||
|
@ -541,9 +535,9 @@ reply_delegator(_ResourceId, ReplyFunAndArgs, Result) ->
|
|||
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
|
||||
end.
|
||||
|
||||
-spec do_get_status(manager_id(), binary(), timer:time()) -> boolean().
|
||||
do_get_status(InstanceId, PoolName, Timeout) ->
|
||||
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)],
|
||||
-spec do_get_status(resource_id(), timer:time()) -> boolean().
|
||||
do_get_status(ResourceId, Timeout) ->
|
||||
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(ResourceId)],
|
||||
DoPerWorker =
|
||||
fun(Worker) ->
|
||||
case ehttpc:health_check(Worker, Timeout) of
|
||||
|
@ -552,7 +546,7 @@ do_get_status(InstanceId, PoolName, Timeout) ->
|
|||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "ehttpc_health_check_failed",
|
||||
instance_id => InstanceId,
|
||||
connector => ResourceId,
|
||||
reason => Reason,
|
||||
worker => Worker
|
||||
}),
|
||||
|
|
|
@ -114,8 +114,8 @@ callback_mode() ->
|
|||
is_buffer_supported() ->
|
||||
true.
|
||||
|
||||
-spec on_start(manager_id(), config()) -> {ok, state()}.
|
||||
on_start(InstanceId, Config) ->
|
||||
-spec on_start(resource_id(), config()) -> {ok, state()}.
|
||||
on_start(ResourceId, Config) ->
|
||||
#{
|
||||
authentication := Auth,
|
||||
bootstrap_hosts := BootstrapHosts0,
|
||||
|
@ -133,7 +133,7 @@ on_start(InstanceId, Config) ->
|
|||
BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
|
||||
KafkaType = kafka_consumer,
|
||||
%% Note: this is distinct per node.
|
||||
ClientID = make_client_id(InstanceId, KafkaType, BridgeName),
|
||||
ClientID = make_client_id(ResourceId, KafkaType, BridgeName),
|
||||
ClientOpts0 =
|
||||
case Auth of
|
||||
none -> [];
|
||||
|
@ -144,26 +144,26 @@ on_start(InstanceId, Config) ->
|
|||
ok ->
|
||||
?tp(
|
||||
kafka_consumer_client_started,
|
||||
#{client_id => ClientID, instance_id => InstanceId}
|
||||
#{client_id => ClientID, resource_id => ResourceId}
|
||||
),
|
||||
?SLOG(info, #{
|
||||
msg => "kafka_consumer_client_started",
|
||||
instance_id => InstanceId,
|
||||
resource_id => ResourceId,
|
||||
kafka_hosts => BootstrapHosts
|
||||
});
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "failed_to_start_kafka_consumer_client",
|
||||
instance_id => InstanceId,
|
||||
resource_id => ResourceId,
|
||||
kafka_hosts => BootstrapHosts,
|
||||
reason => emqx_utils:redact(Reason)
|
||||
}),
|
||||
throw(?CLIENT_DOWN_MESSAGE)
|
||||
end,
|
||||
start_consumer(Config, InstanceId, ClientID).
|
||||
start_consumer(Config, ResourceId, ClientID).
|
||||
|
||||
-spec on_stop(manager_id(), state()) -> ok.
|
||||
on_stop(_InstanceID, State) ->
|
||||
-spec on_stop(resource_id(), state()) -> ok.
|
||||
on_stop(_ResourceID, State) ->
|
||||
#{
|
||||
subscriber_id := SubscriberId,
|
||||
kafka_client_id := ClientID
|
||||
|
@ -172,8 +172,8 @@ on_stop(_InstanceID, State) ->
|
|||
stop_client(ClientID),
|
||||
ok.
|
||||
|
||||
-spec on_get_status(manager_id(), state()) -> connected | disconnected.
|
||||
on_get_status(_InstanceID, State) ->
|
||||
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
|
||||
on_get_status(_ResourceID, State) ->
|
||||
#{
|
||||
subscriber_id := SubscriberId,
|
||||
kafka_client_id := ClientID,
|
||||
|
@ -271,8 +271,8 @@ ensure_consumer_supervisor_started() ->
|
|||
ok
|
||||
end.
|
||||
|
||||
-spec start_consumer(config(), manager_id(), brod:client_id()) -> {ok, state()}.
|
||||
start_consumer(Config, InstanceId, ClientID) ->
|
||||
-spec start_consumer(config(), resource_id(), brod:client_id()) -> {ok, state()}.
|
||||
start_consumer(Config, ResourceId, ClientID) ->
|
||||
#{
|
||||
bootstrap_hosts := BootstrapHosts0,
|
||||
bridge_name := BridgeName,
|
||||
|
@ -292,7 +292,7 @@ start_consumer(Config, InstanceId, ClientID) ->
|
|||
InitialState = #{
|
||||
key_encoding_mode => KeyEncodingMode,
|
||||
hookpoint => Hookpoint,
|
||||
resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName),
|
||||
resource_id => ResourceId,
|
||||
topic_mapping => TopicMapping,
|
||||
value_encoding_mode => ValueEncodingMode
|
||||
},
|
||||
|
@ -337,7 +337,7 @@ start_consumer(Config, InstanceId, ClientID) ->
|
|||
{ok, _ConsumerPid} ->
|
||||
?tp(
|
||||
kafka_consumer_subscriber_started,
|
||||
#{instance_id => InstanceId, subscriber_id => SubscriberId}
|
||||
#{resource_id => ResourceId, subscriber_id => SubscriberId}
|
||||
),
|
||||
{ok, #{
|
||||
subscriber_id => SubscriberId,
|
||||
|
@ -347,7 +347,7 @@ start_consumer(Config, InstanceId, ClientID) ->
|
|||
{error, Reason2} ->
|
||||
?SLOG(error, #{
|
||||
msg => "failed_to_start_kafka_consumer",
|
||||
instance_id => InstanceId,
|
||||
resource_id => ResourceId,
|
||||
kafka_hosts => emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
|
||||
reason => emqx_utils:redact(Reason2)
|
||||
}),
|
||||
|
@ -471,19 +471,19 @@ consumer_group_id(BridgeName0) ->
|
|||
BridgeName = to_bin(BridgeName0),
|
||||
<<"emqx-kafka-consumer-", BridgeName/binary>>.
|
||||
|
||||
-spec is_dry_run(manager_id()) -> boolean().
|
||||
is_dry_run(InstanceId) ->
|
||||
TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX),
|
||||
-spec is_dry_run(resource_id()) -> boolean().
|
||||
is_dry_run(ResourceId) ->
|
||||
TestIdStart = string:find(ResourceId, ?TEST_ID_PREFIX),
|
||||
case TestIdStart of
|
||||
nomatch ->
|
||||
false;
|
||||
_ ->
|
||||
string:equal(TestIdStart, InstanceId)
|
||||
string:equal(TestIdStart, ResourceId)
|
||||
end.
|
||||
|
||||
-spec make_client_id(manager_id(), kafka_consumer, atom() | binary()) -> atom().
|
||||
make_client_id(InstanceId, KafkaType, KafkaName) ->
|
||||
case is_dry_run(InstanceId) of
|
||||
-spec make_client_id(resource_id(), kafka_consumer, atom() | binary()) -> atom().
|
||||
make_client_id(ResourceId, KafkaType, KafkaName) ->
|
||||
case is_dry_run(ResourceId) of
|
||||
false ->
|
||||
ClientID0 = emqx_bridge_kafka_impl:make_client_id(KafkaType, KafkaName),
|
||||
binary_to_atom(ClientID0);
|
||||
|
|
|
@ -70,7 +70,7 @@ callback_mode() -> async_if_possible.
|
|||
%% workers.
|
||||
is_buffer_supported() -> true.
|
||||
|
||||
-spec on_start(manager_id(), config()) -> {ok, state()}.
|
||||
-spec on_start(resource_id(), config()) -> {ok, state()}.
|
||||
on_start(InstanceId, Config) ->
|
||||
#{
|
||||
authentication := _Auth,
|
||||
|
@ -106,7 +106,7 @@ on_start(InstanceId, Config) ->
|
|||
end,
|
||||
start_producer(Config, InstanceId, ClientId, ClientOpts).
|
||||
|
||||
-spec on_stop(manager_id(), state()) -> ok.
|
||||
-spec on_stop(resource_id(), state()) -> ok.
|
||||
on_stop(_InstanceId, State) ->
|
||||
#{
|
||||
pulsar_client_id := ClientId,
|
||||
|
@ -117,7 +117,7 @@ on_stop(_InstanceId, State) ->
|
|||
?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}),
|
||||
ok.
|
||||
|
||||
-spec on_get_status(manager_id(), state()) -> connected | disconnected.
|
||||
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
|
||||
on_get_status(_InstanceId, State = #{}) ->
|
||||
#{
|
||||
pulsar_client_id := ClientId,
|
||||
|
@ -144,7 +144,7 @@ on_get_status(_InstanceId, _State) ->
|
|||
%% create the bridge is not quite finished, `State = undefined'.
|
||||
connecting.
|
||||
|
||||
-spec on_query(manager_id(), {send_message, map()}, state()) ->
|
||||
-spec on_query(resource_id(), {send_message, map()}, state()) ->
|
||||
{ok, term()}
|
||||
| {error, timeout}
|
||||
| {error, term()}.
|
||||
|
@ -163,7 +163,7 @@ on_query(_InstanceId, {send_message, Message}, State) ->
|
|||
end.
|
||||
|
||||
-spec on_query_async(
|
||||
manager_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state()
|
||||
resource_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state()
|
||||
) ->
|
||||
{ok, pid()}.
|
||||
on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) ->
|
||||
|
@ -203,7 +203,7 @@ format_servers(Servers0) ->
|
|||
Servers1
|
||||
).
|
||||
|
||||
-spec make_client_id(manager_id(), atom() | binary()) -> pulsar_client_id().
|
||||
-spec make_client_id(resource_id(), atom() | binary()) -> pulsar_client_id().
|
||||
make_client_id(InstanceId, BridgeName) ->
|
||||
case is_dry_run(InstanceId) of
|
||||
true ->
|
||||
|
@ -218,7 +218,7 @@ make_client_id(InstanceId, BridgeName) ->
|
|||
binary_to_atom(ClientIdBin)
|
||||
end.
|
||||
|
||||
-spec is_dry_run(manager_id()) -> boolean().
|
||||
-spec is_dry_run(resource_id()) -> boolean().
|
||||
is_dry_run(InstanceId) ->
|
||||
TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX),
|
||||
case TestIdStart of
|
||||
|
@ -255,7 +255,7 @@ producer_name(ClientId) ->
|
|||
])
|
||||
).
|
||||
|
||||
-spec start_producer(config(), manager_id(), pulsar_client_id(), map()) -> {ok, state()}.
|
||||
-spec start_producer(config(), resource_id(), pulsar_client_id(), map()) -> {ok, state()}.
|
||||
start_producer(Config, InstanceId, ClientId, ClientOpts) ->
|
||||
#{
|
||||
conn_opts := ConnOpts,
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
%%--------------------------------------------------------------------
|
||||
-type resource_type() :: module().
|
||||
-type resource_id() :: binary().
|
||||
-type manager_id() :: binary().
|
||||
-type raw_resource_config() :: binary() | raw_term_resource_config().
|
||||
-type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()].
|
||||
-type resource_config() :: term().
|
||||
|
|
|
@ -367,9 +367,9 @@ is_buffer_supported(Module) ->
|
|||
|
||||
-spec call_start(resource_id(), module(), resource_config()) ->
|
||||
{ok, resource_state()} | {error, Reason :: term()}.
|
||||
call_start(MgrId, Mod, Config) ->
|
||||
call_start(ResId, Mod, Config) ->
|
||||
try
|
||||
Mod:on_start(MgrId, Config)
|
||||
Mod:on_start(ResId, Config)
|
||||
catch
|
||||
throw:Error ->
|
||||
{error, Error};
|
||||
|
@ -382,12 +382,12 @@ call_start(MgrId, Mod, Config) ->
|
|||
| {resource_status(), resource_state()}
|
||||
| {resource_status(), resource_state(), term()}
|
||||
| {error, term()}.
|
||||
call_health_check(MgrId, Mod, ResourceState) ->
|
||||
?SAFE_CALL(Mod:on_get_status(MgrId, ResourceState)).
|
||||
call_health_check(ResId, Mod, ResourceState) ->
|
||||
?SAFE_CALL(Mod:on_get_status(ResId, ResourceState)).
|
||||
|
||||
-spec call_stop(resource_id(), module(), resource_state()) -> term().
|
||||
call_stop(MgrId, Mod, ResourceState) ->
|
||||
?SAFE_CALL(Mod:on_stop(MgrId, ResourceState)).
|
||||
call_stop(ResId, Mod, ResourceState) ->
|
||||
?SAFE_CALL(Mod:on_stop(ResId, ResourceState)).
|
||||
|
||||
-spec check_config(resource_type(), raw_resource_config()) ->
|
||||
{ok, resource_config()} | {error, term()}.
|
||||
|
|
|
@ -174,7 +174,7 @@ callback_mode() -> async_if_possible.
|
|||
is_buffer_supported() -> false.
|
||||
|
||||
on_start(
|
||||
InstanceId = PoolName,
|
||||
ResourceId = PoolName,
|
||||
#{
|
||||
server := Server,
|
||||
username := Username,
|
||||
|
@ -187,7 +187,7 @@ on_start(
|
|||
) ->
|
||||
?SLOG(info, #{
|
||||
msg => "starting_sqlserver_connector",
|
||||
connector => InstanceId,
|
||||
connector => ResourceId,
|
||||
config => emqx_utils:redact(Config)
|
||||
}),
|
||||
|
||||
|
@ -212,7 +212,7 @@ on_start(
|
|||
],
|
||||
|
||||
State = #{
|
||||
%% also InstanceId
|
||||
%% also ResourceId
|
||||
pool_name => PoolName,
|
||||
sql_templates => parse_sql_template(Config),
|
||||
resource_opts => ResourceOpts
|
||||
|
@ -228,15 +228,15 @@ on_start(
|
|||
{error, Reason}
|
||||
end.
|
||||
|
||||
on_stop(InstanceId, #{pool_name := PoolName} = _State) ->
|
||||
on_stop(ResourceId, _State) ->
|
||||
?SLOG(info, #{
|
||||
msg => "stopping_sqlserver_connector",
|
||||
connector => InstanceId
|
||||
connector => ResourceId
|
||||
}),
|
||||
emqx_resource_pool:stop(PoolName).
|
||||
emqx_resource_pool:stop(ResourceId).
|
||||
|
||||
-spec on_query(
|
||||
manager_id(),
|
||||
resource_id(),
|
||||
{?ACTION_SEND_MESSAGE, map()},
|
||||
state()
|
||||
) ->
|
||||
|
@ -244,16 +244,16 @@ on_stop(InstanceId, #{pool_name := PoolName} = _State) ->
|
|||
| {ok, list()}
|
||||
| {error, {recoverable_error, term()}}
|
||||
| {error, term()}.
|
||||
on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
|
||||
on_query(ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
|
||||
?TRACE(
|
||||
"SINGLE_QUERY_SYNC",
|
||||
"bridge_sqlserver_received",
|
||||
#{requests => Query, connector => InstanceId, state => State}
|
||||
#{requests => Query, connector => ResourceId, state => State}
|
||||
),
|
||||
do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State).
|
||||
do_query(ResourceId, Query, ?SYNC_QUERY_MODE, State).
|
||||
|
||||
-spec on_query_async(
|
||||
manager_id(),
|
||||
resource_id(),
|
||||
{?ACTION_SEND_MESSAGE, map()},
|
||||
{ReplyFun :: function(), Args :: list()},
|
||||
state()
|
||||
|
@ -261,7 +261,7 @@ on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
|
|||
{ok, any()}
|
||||
| {error, term()}.
|
||||
on_query_async(
|
||||
InstanceId,
|
||||
ResourceId,
|
||||
{?ACTION_SEND_MESSAGE, _Msg} = Query,
|
||||
ReplyFunAndArgs,
|
||||
State
|
||||
|
@ -269,12 +269,12 @@ on_query_async(
|
|||
?TRACE(
|
||||
"SINGLE_QUERY_ASYNC",
|
||||
"bridge_sqlserver_received",
|
||||
#{requests => Query, connector => InstanceId, state => State}
|
||||
#{requests => Query, connector => ResourceId, state => State}
|
||||
),
|
||||
do_query(InstanceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
|
||||
do_query(ResourceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
|
||||
|
||||
-spec on_batch_query(
|
||||
manager_id(),
|
||||
resource_id(),
|
||||
[{?ACTION_SEND_MESSAGE, map()}],
|
||||
state()
|
||||
) ->
|
||||
|
@ -282,29 +282,29 @@ on_query_async(
|
|||
| {ok, list()}
|
||||
| {error, {recoverable_error, term()}}
|
||||
| {error, term()}.
|
||||
on_batch_query(InstanceId, BatchRequests, State) ->
|
||||
on_batch_query(ResourceId, BatchRequests, State) ->
|
||||
?TRACE(
|
||||
"BATCH_QUERY_SYNC",
|
||||
"bridge_sqlserver_received",
|
||||
#{requests => BatchRequests, connector => InstanceId, state => State}
|
||||
#{requests => BatchRequests, connector => ResourceId, state => State}
|
||||
),
|
||||
do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State).
|
||||
do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State).
|
||||
|
||||
-spec on_batch_query_async(
|
||||
manager_id(),
|
||||
resource_id(),
|
||||
[{?ACTION_SEND_MESSAGE, map()}],
|
||||
{ReplyFun :: function(), Args :: list()},
|
||||
state()
|
||||
) -> {ok, any()}.
|
||||
on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) ->
|
||||
on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
|
||||
?TRACE(
|
||||
"BATCH_QUERY_ASYNC",
|
||||
"bridge_sqlserver_received",
|
||||
#{requests => Requests, connector => InstanceId, state => State}
|
||||
#{requests => Requests, connector => ResourceId, state => State}
|
||||
),
|
||||
do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
|
||||
do_query(ResourceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
|
||||
|
||||
on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
|
||||
on_get_status(_ResourceId, #{pool_name := PoolName} = _State) ->
|
||||
Health = emqx_resource_pool:health_check_workers(
|
||||
PoolName,
|
||||
{?MODULE, do_get_status, []}
|
||||
|
@ -366,7 +366,7 @@ conn_str([{_, _} | Opts], Acc) ->
|
|||
|
||||
%% Sync & Async query with singe & batch sql statement
|
||||
-spec do_query(
|
||||
manager_id(),
|
||||
resource_id(),
|
||||
Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}],
|
||||
ApplyMode ::
|
||||
handover
|
||||
|
@ -377,7 +377,7 @@ conn_str([{_, _} | Opts], Acc) ->
|
|||
| {error, {recoverable_error, term()}}
|
||||
| {error, term()}.
|
||||
do_query(
|
||||
InstanceId,
|
||||
ResourceId,
|
||||
Query,
|
||||
ApplyMode,
|
||||
#{pool_name := PoolName, sql_templates := Templates} = State
|
||||
|
@ -385,7 +385,7 @@ do_query(
|
|||
?TRACE(
|
||||
"SINGLE_QUERY_SYNC",
|
||||
"sqlserver_connector_received",
|
||||
#{query => Query, connector => InstanceId, state => State}
|
||||
#{query => Query, connector => ResourceId, state => State}
|
||||
),
|
||||
|
||||
%% only insert sql statement for single query and batch query
|
||||
|
@ -409,7 +409,7 @@ do_query(
|
|||
),
|
||||
?SLOG(error, #{
|
||||
msg => "sqlserver_connector_do_query_failed",
|
||||
connector => InstanceId,
|
||||
connector => ResourceId,
|
||||
query => Query,
|
||||
reason => Reason
|
||||
}),
|
||||
|
@ -423,9 +423,9 @@ do_query(
|
|||
end.
|
||||
|
||||
worker_do_insert(
|
||||
Conn, SQL, #{resource_opts := ResourceOpts, pool_name := InstanceId} = State
|
||||
Conn, SQL, #{resource_opts := ResourceOpts, pool_name := ResourceId} = State
|
||||
) ->
|
||||
LogMeta = #{connector => InstanceId, state => State},
|
||||
LogMeta = #{connector => ResourceId, state => State},
|
||||
try
|
||||
case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of
|
||||
{selected, Rows, _} ->
|
||||
|
|
Loading…
Reference in New Issue