Merge pull request #10362 from ft/EMQX-9257/resource-pool

feat(resource): stop adding uniqueness to manager ids
This commit is contained in:
Andrew Mayorov 2023-05-02 18:51:20 +03:00 committed by GitHub
commit 6a043ff09a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 191 additions and 260 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [ {application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"}, {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.1.0"}, {vsn, "0.1.1"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -38,7 +38,6 @@
}. }.
-type state() :: #{ -type state() :: #{
connect_timeout := timer:time(), connect_timeout := timer:time(),
instance_id := manager_id(),
jwt_worker_id := jwt_worker(), jwt_worker_id := jwt_worker(),
max_retries := non_neg_integer(), max_retries := non_neg_integer(),
payload_template := emqx_plugin_libs_rule:tmpl_token(), payload_template := emqx_plugin_libs_rule:tmpl_token(),
@ -61,9 +60,9 @@ is_buffer_supported() -> false.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
-spec on_start(manager_id(), config()) -> {ok, state()} | {error, term()}. -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
on_start( on_start(
InstanceId, ResourceId,
#{ #{
connect_timeout := ConnectTimeout, connect_timeout := ConnectTimeout,
max_retries := MaxRetries, max_retries := MaxRetries,
@ -75,7 +74,7 @@ on_start(
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_gcp_pubsub_bridge", msg => "starting_gcp_pubsub_bridge",
connector => InstanceId, connector => ResourceId,
config => Config config => Config
}), }),
%% emulating the emulator behavior %% emulating the emulator behavior
@ -100,14 +99,13 @@ on_start(
#{ #{
jwt_worker_id := JWTWorkerId, jwt_worker_id := JWTWorkerId,
project_id := ProjectId project_id := ProjectId
} = ensure_jwt_worker(InstanceId, Config), } = ensure_jwt_worker(ResourceId, Config),
State = #{ State = #{
connect_timeout => ConnectTimeout, connect_timeout => ConnectTimeout,
instance_id => InstanceId,
jwt_worker_id => JWTWorkerId, jwt_worker_id => JWTWorkerId,
max_retries => MaxRetries, max_retries => MaxRetries,
payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate), payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate),
pool_name => InstanceId, pool_name => ResourceId,
project_id => ProjectId, project_id => ProjectId,
pubsub_topic => PubSubTopic, pubsub_topic => PubSubTopic,
request_timeout => RequestTimeout request_timeout => RequestTimeout
@ -115,39 +113,39 @@ on_start(
?tp( ?tp(
gcp_pubsub_on_start_before_starting_pool, gcp_pubsub_on_start_before_starting_pool,
#{ #{
instance_id => InstanceId, resource_id => ResourceId,
pool_name => InstanceId, pool_name => ResourceId,
pool_opts => PoolOpts pool_opts => PoolOpts
} }
), ),
?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => InstanceId}), ?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => ResourceId}),
case ehttpc_sup:start_pool(InstanceId, PoolOpts) of case ehttpc_sup:start_pool(ResourceId, PoolOpts) of
{ok, _} -> {ok, _} ->
{ok, State}; {ok, State};
{error, {already_started, _}} -> {error, {already_started, _}} ->
?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => InstanceId}), ?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => ResourceId}),
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
?tp(gcp_pubsub_ehttpc_pool_start_failure, #{ ?tp(gcp_pubsub_ehttpc_pool_start_failure, #{
pool_name => InstanceId, pool_name => ResourceId,
reason => Reason reason => Reason
}), }),
{error, Reason} {error, Reason}
end. end.
-spec on_stop(manager_id(), state()) -> ok | {error, term()}. -spec on_stop(resource_id(), state()) -> ok | {error, term()}.
on_stop( on_stop(
InstanceId, ResourceId,
_State = #{jwt_worker_id := JWTWorkerId, pool_name := PoolName} _State = #{jwt_worker_id := JWTWorkerId}
) -> ) ->
?tp(gcp_pubsub_stop, #{instance_id => InstanceId, jwt_worker_id => JWTWorkerId}), ?tp(gcp_pubsub_stop, #{resource_id => ResourceId, jwt_worker_id => JWTWorkerId}),
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_gcp_pubsub_bridge", msg => "stopping_gcp_pubsub_bridge",
connector => InstanceId connector => ResourceId
}), }),
emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
emqx_connector_jwt:delete_jwt(?JWT_TABLE, InstanceId), emqx_connector_jwt:delete_jwt(?JWT_TABLE, ResourceId),
ehttpc_sup:stop_pool(PoolName). ehttpc_sup:stop_pool(ResourceId).
-spec on_query( -spec on_query(
resource_id(), resource_id(),
@ -213,9 +211,9 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
), ),
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId). do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId).
-spec on_get_status(manager_id(), state()) -> connected | disconnected. -spec on_get_status(resource_id(), state()) -> connected | disconnected.
on_get_status(InstanceId, #{connect_timeout := Timeout, pool_name := PoolName} = State) -> on_get_status(ResourceId, #{connect_timeout := Timeout} = State) ->
case do_get_status(InstanceId, PoolName, Timeout) of case do_get_status(ResourceId, Timeout) of
true -> true ->
connected; connected;
false -> false ->
@ -230,12 +228,12 @@ on_get_status(InstanceId, #{connect_timeout := Timeout, pool_name := PoolName} =
%% Helper fns %% Helper fns
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
-spec ensure_jwt_worker(manager_id(), config()) -> -spec ensure_jwt_worker(resource_id(), config()) ->
#{ #{
jwt_worker_id := jwt_worker(), jwt_worker_id := jwt_worker(),
project_id := binary() project_id := binary()
}. }.
ensure_jwt_worker(InstanceId, #{ ensure_jwt_worker(ResourceId, #{
service_account_json := ServiceAccountJSON service_account_json := ServiceAccountJSON
}) -> }) ->
#{ #{
@ -250,7 +248,7 @@ ensure_jwt_worker(InstanceId, #{
Alg = <<"RS256">>, Alg = <<"RS256">>,
Config = #{ Config = #{
private_key => PrivateKeyPEM, private_key => PrivateKeyPEM,
resource_id => InstanceId, resource_id => ResourceId,
expiration => ExpirationMS, expiration => ExpirationMS,
table => ?JWT_TABLE, table => ?JWT_TABLE,
iss => ServiceAccountEmail, iss => ServiceAccountEmail,
@ -260,14 +258,14 @@ ensure_jwt_worker(InstanceId, #{
alg => Alg alg => Alg
}, },
JWTWorkerId = <<"gcp_pubsub_jwt_worker:", InstanceId/binary>>, JWTWorkerId = <<"gcp_pubsub_jwt_worker:", ResourceId/binary>>,
Worker = Worker =
case emqx_connector_jwt_sup:ensure_worker_present(JWTWorkerId, Config) of case emqx_connector_jwt_sup:ensure_worker_present(JWTWorkerId, Config) of
{ok, Worker0} -> {ok, Worker0} ->
Worker0; Worker0;
Error -> Error ->
?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{ ?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{
connector => InstanceId, connector => ResourceId,
reason => Error reason => Error
}), }),
_ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
@ -281,18 +279,18 @@ ensure_jwt_worker(InstanceId, #{
%% produced by the worker. %% produced by the worker.
receive receive
{Ref, token_created} -> {Ref, token_created} ->
?tp(gcp_pubsub_bridge_jwt_created, #{resource_id => InstanceId}), ?tp(gcp_pubsub_bridge_jwt_created, #{resource_id => ResourceId}),
demonitor(MRef, [flush]), demonitor(MRef, [flush]),
ok; ok;
{'DOWN', MRef, process, Worker, Reason} -> {'DOWN', MRef, process, Worker, Reason} ->
?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{ ?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{
connector => InstanceId, connector => ResourceId,
reason => Reason reason => Reason
}), }),
_ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
throw(failed_to_start_jwt_worker) throw(failed_to_start_jwt_worker)
after 10_000 -> after 10_000 ->
?tp(warning, "gcp_pubsub_bridge_jwt_timeout", #{connector => InstanceId}), ?tp(warning, "gcp_pubsub_bridge_jwt_timeout", #{connector => ResourceId}),
demonitor(MRef, [flush]), demonitor(MRef, [flush]),
_ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
throw(timeout_creating_jwt) throw(timeout_creating_jwt)
@ -325,8 +323,8 @@ publish_path(
<<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>. <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>.
-spec get_jwt_authorization_header(resource_id()) -> [{binary(), binary()}]. -spec get_jwt_authorization_header(resource_id()) -> [{binary(), binary()}].
get_jwt_authorization_header(InstanceId) -> get_jwt_authorization_header(ResourceId) ->
case emqx_connector_jwt:lookup_jwt(?JWT_TABLE, InstanceId) of case emqx_connector_jwt:lookup_jwt(?JWT_TABLE, ResourceId) of
%% Since we synchronize the JWT creation during resource start %% Since we synchronize the JWT creation during resource start
%% (see `on_start/2'), this will be always be populated. %% (see `on_start/2'), this will be always be populated.
{ok, JWT} -> {ok, JWT} ->
@ -345,7 +343,6 @@ get_jwt_authorization_header(InstanceId) ->
do_send_requests_sync(State, Requests, ResourceId) -> do_send_requests_sync(State, Requests, ResourceId) ->
#{ #{
pool_name := PoolName, pool_name := PoolName,
instance_id := InstanceId,
max_retries := MaxRetries, max_retries := MaxRetries,
request_timeout := RequestTimeout request_timeout := RequestTimeout
} = State, } = State,
@ -353,12 +350,11 @@ do_send_requests_sync(State, Requests, ResourceId) ->
gcp_pubsub_bridge_do_send_requests, gcp_pubsub_bridge_do_send_requests,
#{ #{
query_mode => sync, query_mode => sync,
instance_id => InstanceId,
resource_id => ResourceId, resource_id => ResourceId,
requests => Requests requests => Requests
} }
), ),
Headers = get_jwt_authorization_header(InstanceId), Headers = get_jwt_authorization_header(ResourceId),
Payloads = Payloads =
lists:map( lists:map(
fun({send_message, Selected}) -> fun({send_message, Selected}) ->
@ -471,19 +467,17 @@ do_send_requests_sync(State, Requests, ResourceId) ->
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
#{ #{
pool_name := PoolName, pool_name := PoolName,
instance_id := InstanceId,
request_timeout := RequestTimeout request_timeout := RequestTimeout
} = State, } = State,
?tp( ?tp(
gcp_pubsub_bridge_do_send_requests, gcp_pubsub_bridge_do_send_requests,
#{ #{
query_mode => async, query_mode => async,
instance_id => InstanceId,
resource_id => ResourceId, resource_id => ResourceId,
requests => Requests requests => Requests
} }
), ),
Headers = get_jwt_authorization_header(InstanceId), Headers = get_jwt_authorization_header(ResourceId),
Payloads = Payloads =
lists:map( lists:map(
fun({send_message, Selected}) -> fun({send_message, Selected}) ->
@ -541,9 +535,9 @@ reply_delegator(_ResourceId, ReplyFunAndArgs, Result) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
end. end.
-spec do_get_status(manager_id(), binary(), timer:time()) -> boolean(). -spec do_get_status(resource_id(), timer:time()) -> boolean().
do_get_status(InstanceId, PoolName, Timeout) -> do_get_status(ResourceId, Timeout) ->
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)], Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(ResourceId)],
DoPerWorker = DoPerWorker =
fun(Worker) -> fun(Worker) ->
case ehttpc:health_check(Worker, Timeout) of case ehttpc:health_check(Worker, Timeout) of
@ -552,7 +546,7 @@ do_get_status(InstanceId, PoolName, Timeout) ->
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "ehttpc_health_check_failed", msg => "ehttpc_health_check_failed",
instance_id => InstanceId, connector => ResourceId,
reason => Reason, reason => Reason,
worker => Worker worker => Worker
}), }),

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_kafka, [ {application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"}, {description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.1.1"}, {vsn, "0.1.2"},
{registered, [emqx_bridge_kafka_consumer_sup]}, {registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [ {applications, [
kernel, kernel,

View File

@ -114,8 +114,8 @@ callback_mode() ->
is_buffer_supported() -> is_buffer_supported() ->
true. true.
-spec on_start(manager_id(), config()) -> {ok, state()}. -spec on_start(resource_id(), config()) -> {ok, state()}.
on_start(InstanceId, Config) -> on_start(ResourceId, Config) ->
#{ #{
authentication := Auth, authentication := Auth,
bootstrap_hosts := BootstrapHosts0, bootstrap_hosts := BootstrapHosts0,
@ -133,7 +133,7 @@ on_start(InstanceId, Config) ->
BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0), BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
KafkaType = kafka_consumer, KafkaType = kafka_consumer,
%% Note: this is distinct per node. %% Note: this is distinct per node.
ClientID = make_client_id(InstanceId, KafkaType, BridgeName), ClientID = make_client_id(ResourceId, KafkaType, BridgeName),
ClientOpts0 = ClientOpts0 =
case Auth of case Auth of
none -> []; none -> [];
@ -144,26 +144,26 @@ on_start(InstanceId, Config) ->
ok -> ok ->
?tp( ?tp(
kafka_consumer_client_started, kafka_consumer_client_started,
#{client_id => ClientID, instance_id => InstanceId} #{client_id => ClientID, resource_id => ResourceId}
), ),
?SLOG(info, #{ ?SLOG(info, #{
msg => "kafka_consumer_client_started", msg => "kafka_consumer_client_started",
instance_id => InstanceId, resource_id => ResourceId,
kafka_hosts => BootstrapHosts kafka_hosts => BootstrapHosts
}); });
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_start_kafka_consumer_client", msg => "failed_to_start_kafka_consumer_client",
instance_id => InstanceId, resource_id => ResourceId,
kafka_hosts => BootstrapHosts, kafka_hosts => BootstrapHosts,
reason => emqx_utils:redact(Reason) reason => emqx_utils:redact(Reason)
}), }),
throw(?CLIENT_DOWN_MESSAGE) throw(?CLIENT_DOWN_MESSAGE)
end, end,
start_consumer(Config, InstanceId, ClientID). start_consumer(Config, ResourceId, ClientID).
-spec on_stop(manager_id(), state()) -> ok. -spec on_stop(resource_id(), state()) -> ok.
on_stop(_InstanceID, State) -> on_stop(_ResourceID, State) ->
#{ #{
subscriber_id := SubscriberId, subscriber_id := SubscriberId,
kafka_client_id := ClientID kafka_client_id := ClientID
@ -172,8 +172,8 @@ on_stop(_InstanceID, State) ->
stop_client(ClientID), stop_client(ClientID),
ok. ok.
-spec on_get_status(manager_id(), state()) -> connected | disconnected. -spec on_get_status(resource_id(), state()) -> connected | disconnected.
on_get_status(_InstanceID, State) -> on_get_status(_ResourceID, State) ->
#{ #{
subscriber_id := SubscriberId, subscriber_id := SubscriberId,
kafka_client_id := ClientID, kafka_client_id := ClientID,
@ -271,8 +271,8 @@ ensure_consumer_supervisor_started() ->
ok ok
end. end.
-spec start_consumer(config(), manager_id(), brod:client_id()) -> {ok, state()}. -spec start_consumer(config(), resource_id(), brod:client_id()) -> {ok, state()}.
start_consumer(Config, InstanceId, ClientID) -> start_consumer(Config, ResourceId, ClientID) ->
#{ #{
bootstrap_hosts := BootstrapHosts0, bootstrap_hosts := BootstrapHosts0,
bridge_name := BridgeName, bridge_name := BridgeName,
@ -292,7 +292,7 @@ start_consumer(Config, InstanceId, ClientID) ->
InitialState = #{ InitialState = #{
key_encoding_mode => KeyEncodingMode, key_encoding_mode => KeyEncodingMode,
hookpoint => Hookpoint, hookpoint => Hookpoint,
resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName), resource_id => ResourceId,
topic_mapping => TopicMapping, topic_mapping => TopicMapping,
value_encoding_mode => ValueEncodingMode value_encoding_mode => ValueEncodingMode
}, },
@ -337,7 +337,7 @@ start_consumer(Config, InstanceId, ClientID) ->
{ok, _ConsumerPid} -> {ok, _ConsumerPid} ->
?tp( ?tp(
kafka_consumer_subscriber_started, kafka_consumer_subscriber_started,
#{instance_id => InstanceId, subscriber_id => SubscriberId} #{resource_id => ResourceId, subscriber_id => SubscriberId}
), ),
{ok, #{ {ok, #{
subscriber_id => SubscriberId, subscriber_id => SubscriberId,
@ -347,7 +347,7 @@ start_consumer(Config, InstanceId, ClientID) ->
{error, Reason2} -> {error, Reason2} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_start_kafka_consumer", msg => "failed_to_start_kafka_consumer",
instance_id => InstanceId, resource_id => ResourceId,
kafka_hosts => emqx_bridge_kafka_impl:hosts(BootstrapHosts0), kafka_hosts => emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
reason => emqx_utils:redact(Reason2) reason => emqx_utils:redact(Reason2)
}), }),
@ -471,19 +471,19 @@ consumer_group_id(BridgeName0) ->
BridgeName = to_bin(BridgeName0), BridgeName = to_bin(BridgeName0),
<<"emqx-kafka-consumer-", BridgeName/binary>>. <<"emqx-kafka-consumer-", BridgeName/binary>>.
-spec is_dry_run(manager_id()) -> boolean(). -spec is_dry_run(resource_id()) -> boolean().
is_dry_run(InstanceId) -> is_dry_run(ResourceId) ->
TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), TestIdStart = string:find(ResourceId, ?TEST_ID_PREFIX),
case TestIdStart of case TestIdStart of
nomatch -> nomatch ->
false; false;
_ -> _ ->
string:equal(TestIdStart, InstanceId) string:equal(TestIdStart, ResourceId)
end. end.
-spec make_client_id(manager_id(), kafka_consumer, atom() | binary()) -> atom(). -spec make_client_id(resource_id(), kafka_consumer, atom() | binary()) -> atom().
make_client_id(InstanceId, KafkaType, KafkaName) -> make_client_id(ResourceId, KafkaType, KafkaName) ->
case is_dry_run(InstanceId) of case is_dry_run(ResourceId) of
false -> false ->
ClientID0 = emqx_bridge_kafka_impl:make_client_id(KafkaType, KafkaName), ClientID0 = emqx_bridge_kafka_impl:make_client_id(KafkaType, KafkaName),
binary_to_atom(ClientID0); binary_to_atom(ClientID0);

View File

@ -70,7 +70,7 @@ callback_mode() -> async_if_possible.
%% workers. %% workers.
is_buffer_supported() -> true. is_buffer_supported() -> true.
-spec on_start(manager_id(), config()) -> {ok, state()}. -spec on_start(resource_id(), config()) -> {ok, state()}.
on_start(InstanceId, Config) -> on_start(InstanceId, Config) ->
#{ #{
authentication := _Auth, authentication := _Auth,
@ -106,7 +106,7 @@ on_start(InstanceId, Config) ->
end, end,
start_producer(Config, InstanceId, ClientId, ClientOpts). start_producer(Config, InstanceId, ClientId, ClientOpts).
-spec on_stop(manager_id(), state()) -> ok. -spec on_stop(resource_id(), state()) -> ok.
on_stop(_InstanceId, State) -> on_stop(_InstanceId, State) ->
#{ #{
pulsar_client_id := ClientId, pulsar_client_id := ClientId,
@ -117,7 +117,7 @@ on_stop(_InstanceId, State) ->
?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}), ?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}),
ok. ok.
-spec on_get_status(manager_id(), state()) -> connected | disconnected. -spec on_get_status(resource_id(), state()) -> connected | disconnected.
on_get_status(_InstanceId, State = #{}) -> on_get_status(_InstanceId, State = #{}) ->
#{ #{
pulsar_client_id := ClientId, pulsar_client_id := ClientId,
@ -144,7 +144,7 @@ on_get_status(_InstanceId, _State) ->
%% create the bridge is not quite finished, `State = undefined'. %% create the bridge is not quite finished, `State = undefined'.
connecting. connecting.
-spec on_query(manager_id(), {send_message, map()}, state()) -> -spec on_query(resource_id(), {send_message, map()}, state()) ->
{ok, term()} {ok, term()}
| {error, timeout} | {error, timeout}
| {error, term()}. | {error, term()}.
@ -163,7 +163,7 @@ on_query(_InstanceId, {send_message, Message}, State) ->
end. end.
-spec on_query_async( -spec on_query_async(
manager_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state() resource_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state()
) -> ) ->
{ok, pid()}. {ok, pid()}.
on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) -> on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) ->
@ -203,7 +203,7 @@ format_servers(Servers0) ->
Servers1 Servers1
). ).
-spec make_client_id(manager_id(), atom() | binary()) -> pulsar_client_id(). -spec make_client_id(resource_id(), atom() | binary()) -> pulsar_client_id().
make_client_id(InstanceId, BridgeName) -> make_client_id(InstanceId, BridgeName) ->
case is_dry_run(InstanceId) of case is_dry_run(InstanceId) of
true -> true ->
@ -218,7 +218,7 @@ make_client_id(InstanceId, BridgeName) ->
binary_to_atom(ClientIdBin) binary_to_atom(ClientIdBin)
end. end.
-spec is_dry_run(manager_id()) -> boolean(). -spec is_dry_run(resource_id()) -> boolean().
is_dry_run(InstanceId) -> is_dry_run(InstanceId) ->
TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX),
case TestIdStart of case TestIdStart of
@ -255,7 +255,7 @@ producer_name(ClientId) ->
]) ])
). ).
-spec start_producer(config(), manager_id(), pulsar_client_id(), map()) -> {ok, state()}. -spec start_producer(config(), resource_id(), pulsar_client_id(), map()) -> {ok, state()}.
start_producer(Config, InstanceId, ClientId, ClientOpts) -> start_producer(Config, InstanceId, ClientId, ClientOpts) ->
#{ #{
conn_opts := ConnOpts, conn_opts := ConnOpts,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_connector, [ {application, emqx_connector, [
{description, "EMQX Data Integration Connectors"}, {description, "EMQX Data Integration Connectors"},
{vsn, "0.1.21"}, {vsn, "0.1.22"},
{registered, []}, {registered, []},
{mod, {emqx_connector_app, []}}, {mod, {emqx_connector_app, []}},
{applications, [ {applications, [

View File

@ -248,13 +248,12 @@ make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 ->
undefined; undefined;
make_sub_confs(undefined, _Conf, _) -> make_sub_confs(undefined, _Conf, _) ->
undefined; undefined;
make_sub_confs(SubRemoteConf, Conf, InstanceId) -> make_sub_confs(SubRemoteConf, Conf, ResourceId) ->
ResId = emqx_resource_manager:manager_id_to_resource_id(InstanceId),
case maps:find(hookpoint, Conf) of case maps:find(hookpoint, Conf) of
error -> error ->
error({no_hookpoint_provided, Conf}); error({no_hookpoint_provided, Conf});
{ok, HookPoint} -> {ok, HookPoint} ->
MFA = {?MODULE, on_message_received, [HookPoint, ResId]}, MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]},
SubRemoteConf#{on_message_received => MFA} SubRemoteConf#{on_message_received => MFA}
end. end.

View File

@ -15,7 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-type resource_type() :: module(). -type resource_type() :: module().
-type resource_id() :: binary(). -type resource_id() :: binary().
-type manager_id() :: binary().
-type raw_resource_config() :: binary() | raw_term_resource_config(). -type raw_resource_config() :: binary() | raw_term_resource_config().
-type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()]. -type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()].
-type resource_config() :: term(). -type resource_config() :: term().

View File

@ -113,7 +113,10 @@
-export([apply_reply_fun/2]). -export([apply_reply_fun/2]).
-export_type([resource_data/0]). -export_type([
resource_id/0,
resource_data/0
]).
-optional_callbacks([ -optional_callbacks([
on_query/3, on_query/3,
@ -362,11 +365,11 @@ is_buffer_supported(Module) ->
false false
end. end.
-spec call_start(manager_id(), module(), resource_config()) -> -spec call_start(resource_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}. {ok, resource_state()} | {error, Reason :: term()}.
call_start(MgrId, Mod, Config) -> call_start(ResId, Mod, Config) ->
try try
Mod:on_start(MgrId, Config) Mod:on_start(ResId, Config)
catch catch
throw:Error -> throw:Error ->
{error, Error}; {error, Error};
@ -374,17 +377,17 @@ call_start(MgrId, Mod, Config) ->
{error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}} {error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}}
end. end.
-spec call_health_check(manager_id(), module(), resource_state()) -> -spec call_health_check(resource_id(), module(), resource_state()) ->
resource_status() resource_status()
| {resource_status(), resource_state()} | {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()} | {resource_status(), resource_state(), term()}
| {error, term()}. | {error, term()}.
call_health_check(MgrId, Mod, ResourceState) -> call_health_check(ResId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_status(MgrId, ResourceState)). ?SAFE_CALL(Mod:on_get_status(ResId, ResourceState)).
-spec call_stop(manager_id(), module(), resource_state()) -> term(). -spec call_stop(resource_id(), module(), resource_state()) -> term().
call_stop(MgrId, Mod, ResourceState) -> call_stop(ResId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_stop(MgrId, ResourceState)). ?SAFE_CALL(Mod:on_stop(ResId, ResourceState)).
-spec check_config(resource_type(), raw_resource_config()) -> -spec check_config(resource_type(), raw_resource_config()) ->
{ok, resource_config()} | {error, term()}. {ok, resource_config()} | {error, term()}.

View File

@ -52,6 +52,7 @@ init([]) ->
ChildSpecs = [], ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.
-spec start_workers(emqx_resource:resource_id(), _Opts :: #{atom() => _}) -> ok.
start_workers(ResId, Opts) -> start_workers(ResId, Opts) ->
WorkerPoolSize = worker_pool_size(Opts), WorkerPoolSize = worker_pool_size(Opts),
_ = ensure_worker_pool(ResId, hash, [{size, WorkerPoolSize}]), _ = ensure_worker_pool(ResId, hash, [{size, WorkerPoolSize}]),
@ -63,6 +64,7 @@ start_workers(ResId, Opts) ->
lists:seq(1, WorkerPoolSize) lists:seq(1, WorkerPoolSize)
). ).
-spec stop_workers(emqx_resource:resource_id(), _Opts :: #{atom() => _}) -> ok.
stop_workers(ResId, Opts) -> stop_workers(ResId, Opts) ->
WorkerPoolSize = worker_pool_size(Opts), WorkerPoolSize = worker_pool_size(Opts),
lists:foreach( lists:foreach(
@ -75,6 +77,7 @@ stop_workers(ResId, Opts) ->
ensure_worker_pool_removed(ResId), ensure_worker_pool_removed(ResId),
ok. ok.
-spec worker_pids(emqx_resource:resource_id()) -> [pid()].
worker_pids(ResId) -> worker_pids(ResId) ->
lists:map( lists:map(
fun({_Name, Pid}) -> fun({_Name, Pid}) ->

View File

@ -42,23 +42,24 @@
]). ]).
-export([ -export([
set_resource_status_connecting/1, set_resource_status_connecting/1
manager_id_to_resource_id/1
]). ]).
% Server % Server
-export([start_link/6]). -export([start_link/5]).
% Behaviour % Behaviour
-export([init/1, callback_mode/0, handle_event/4, terminate/3]). -export([init/1, callback_mode/0, handle_event/4, terminate/3]).
% State record % State record
-record(data, { -record(data, {
id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid
}). }).
-type data() :: #data{}. -type data() :: #data{}.
-define(ETS_TABLE, ?MODULE). -define(NAME(ResId), {n, l, {?MODULE, ResId}}).
-define(REF(ResId), {via, gproc, ?NAME(ResId)}).
-define(WAIT_FOR_RESOURCE_DELAY, 100). -define(WAIT_FOR_RESOURCE_DELAY, 100).
-define(T_OPERATION, 5000). -define(T_OPERATION, 5000).
-define(T_LOOKUP, 1000). -define(T_LOOKUP, 1000).
@ -69,13 +70,6 @@
%% API %% API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
make_manager_id(ResId) ->
emqx_resource:generate_id(ResId).
manager_id_to_resource_id(MgrId) ->
[ResId, _Index] = string:split(MgrId, ":", trailing),
ResId.
%% @doc Called from emqx_resource when starting a resource instance. %% @doc Called from emqx_resource when starting a resource instance.
%% %%
%% Triggers the emqx_resource_manager_sup supervisor to actually create %% Triggers the emqx_resource_manager_sup supervisor to actually create
@ -92,8 +86,7 @@ ensure_resource(ResId, Group, ResourceType, Config, Opts) ->
{ok, _Group, Data} -> {ok, _Group, Data} ->
{ok, Data}; {ok, Data};
{error, not_found} -> {error, not_found} ->
MgrId = set_new_owner(ResId), create_and_return_data(ResId, Group, ResourceType, Config, Opts)
create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts)
end. end.
%% @doc Called from emqx_resource when recreating a resource which may or may not exist %% @doc Called from emqx_resource when recreating a resource which may or may not exist
@ -103,23 +96,22 @@ recreate(ResId, ResourceType, NewConfig, Opts) ->
case lookup(ResId) of case lookup(ResId) of
{ok, Group, #{mod := ResourceType, status := _} = _Data} -> {ok, Group, #{mod := ResourceType, status := _} = _Data} ->
_ = remove(ResId, false), _ = remove(ResId, false),
MgrId = set_new_owner(ResId), create_and_return_data(ResId, Group, ResourceType, NewConfig, Opts);
create_and_return_data(MgrId, ResId, Group, ResourceType, NewConfig, Opts);
{ok, _, #{mod := Mod}} when Mod =/= ResourceType -> {ok, _, #{mod := Mod}} when Mod =/= ResourceType ->
{error, updating_to_incorrect_resource_type}; {error, updating_to_incorrect_resource_type};
{error, not_found} -> {error, not_found} ->
{error, not_found} {error, not_found}
end. end.
create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) -> create_and_return_data(ResId, Group, ResourceType, Config, Opts) ->
_ = create(MgrId, ResId, Group, ResourceType, Config, Opts), _ = create(ResId, Group, ResourceType, Config, Opts),
{ok, _Group, Data} = lookup(ResId), {ok, _Group, Data} = lookup(ResId),
{ok, Data}. {ok, Data}.
%% @doc Create a resource_manager and wait until it is running %% @doc Create a resource_manager and wait until it is running
create(MgrId, ResId, Group, ResourceType, Config, Opts) -> create(ResId, Group, ResourceType, Config, Opts) ->
% The state machine will make the actual call to the callback/resource module after init % The state machine will make the actual call to the callback/resource module after init
ok = emqx_resource_manager_sup:ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts), ok = emqx_resource_manager_sup:ensure_child(ResId, Group, ResourceType, Config, Opts),
ok = emqx_metrics_worker:create_metrics( ok = emqx_metrics_worker:create_metrics(
?RES_METRICS, ?RES_METRICS,
ResId, ResId,
@ -164,15 +156,12 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
ok | {error, Reason :: term()}. ok | {error, Reason :: term()}.
create_dry_run(ResourceType, Config) -> create_dry_run(ResourceType, Config) ->
ResId = make_test_id(), ResId = make_test_id(),
MgrId = set_new_owner(ResId),
Opts = Opts =
case is_map(Config) of case is_map(Config) of
true -> maps:get(resource_opts, Config, #{}); true -> maps:get(resource_opts, Config, #{});
false -> #{} false -> #{}
end, end,
ok = emqx_resource_manager_sup:ensure_child( ok = emqx_resource_manager_sup:ensure_child(ResId, <<"dry_run">>, ResourceType, Config, Opts),
MgrId, ResId, <<"dry_run">>, ResourceType, Config, Opts
),
case wait_for_ready(ResId, 5000) of case wait_for_ready(ResId, 5000) of
ok -> ok ->
remove(ResId); remove(ResId);
@ -242,10 +231,11 @@ lookup(ResId) ->
%% @doc Lookup the group and data of a resource from the cache %% @doc Lookup the group and data of a resource from the cache
-spec lookup_cached(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -spec lookup_cached(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
lookup_cached(ResId) -> lookup_cached(ResId) ->
case read_cache(ResId) of try read_cache(ResId) of
{Group, Data} -> Data = #data{group = Group} ->
{ok, Group, data_record_to_external_map(Data)}; {ok, Group, data_record_to_external_map(Data)}
not_found -> catch
error:badarg ->
{error, not_found} {error, not_found}
end. end.
@ -261,20 +251,16 @@ reset_metrics(ResId) ->
%% @doc Returns the data for all resources %% @doc Returns the data for all resources
-spec list_all() -> [resource_data()]. -spec list_all() -> [resource_data()].
list_all() -> list_all() ->
try lists:map(
[ fun data_record_to_external_map/1,
data_record_to_external_map(Data) gproc:select({local, names}, [{{?NAME('_'), '_', '$1'}, [], ['$1']}])
|| {_Id, _Group, Data} <- ets:tab2list(?ETS_TABLE) ).
]
catch
error:badarg -> []
end.
%% @doc Returns a list of ids for all the resources in a group %% @doc Returns a list of ids for all the resources in a group
-spec list_group(resource_group()) -> [resource_id()]. -spec list_group(resource_group()) -> [resource_id()].
list_group(Group) -> list_group(Group) ->
List = ets:match(?ETS_TABLE, {'$1', Group, '_'}), Guard = {'==', {element, #data.group, '$1'}, Group},
lists:flatten(List). gproc:select({local, names}, [{{?NAME('$2'), '_', '$1'}, [Guard], ['$2']}]).
-spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}. -spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}.
health_check(ResId) -> health_check(ResId) ->
@ -283,10 +269,9 @@ health_check(ResId) ->
%% Server start/stop callbacks %% Server start/stop callbacks
%% @doc Function called from the supervisor to actually start the server %% @doc Function called from the supervisor to actually start the server
start_link(MgrId, ResId, Group, ResourceType, Config, Opts) -> start_link(ResId, Group, ResourceType, Config, Opts) ->
Data = #data{ Data = #data{
id = ResId, id = ResId,
manager_id = MgrId,
group = Group, group = Group,
mod = ResourceType, mod = ResourceType,
callback_mode = emqx_resource:get_callback_mode(ResourceType), callback_mode = emqx_resource:get_callback_mode(ResourceType),
@ -300,7 +285,7 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) ->
state = undefined, state = undefined,
error = undefined error = undefined
}, },
gen_statem:start_link(?MODULE, {Data, Opts}, []). gen_statem:start_link(?REF(ResId), ?MODULE, {Data, Opts}, []).
init({DataIn, Opts}) -> init({DataIn, Opts}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
@ -320,7 +305,7 @@ terminate({shutdown, removed}, _State, _Data) ->
ok; ok;
terminate(_Reason, _State, Data) -> terminate(_Reason, _State, Data) ->
_ = maybe_stop_resource(Data), _ = maybe_stop_resource(Data),
ok = delete_cache(Data#data.id, Data#data.manager_id), _ = erase_cache(Data),
ok. ok.
%% Behavior callback %% Behavior callback
@ -345,9 +330,6 @@ handle_event({call, From}, start, State, Data) when
start_resource(Data, From); start_resource(Data, From);
handle_event({call, From}, start, _State, _Data) -> handle_event({call, From}, start, _State, _Data) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, [{reply, From, ok}]};
% Called when the resource received a `quit` message
handle_event(info, quit, _State, _Data) ->
{stop, {shutdown, quit}};
% Called when the resource is to be stopped % Called when the resource is to be stopped
handle_event({call, From}, stop, stopped, _Data) -> handle_event({call, From}, stop, stopped, _Data) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, [{reply, From, ok}]};
@ -418,9 +400,9 @@ log_state_consistency(State, Data) ->
data => Data data => Data
}). }).
log_cache_consistency({_, Data}, Data) -> log_cache_consistency(Data, Data) ->
ok; ok;
log_cache_consistency({_, DataCached}, Data) -> log_cache_consistency(DataCached, Data) ->
?tp(warning, "inconsistent_cache", #{ ?tp(warning, "inconsistent_cache", #{
cache => DataCached, cache => DataCached,
data => Data data => Data
@ -429,56 +411,20 @@ log_cache_consistency({_, DataCached}, Data) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% internal functions %% internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
insert_cache(ResId, Group, Data = #data{manager_id = MgrId}) -> insert_cache(ResId, Data = #data{}) ->
case get_owner(ResId) of gproc:set_value(?NAME(ResId), Data).
not_found ->
ets:insert(?ETS_TABLE, {ResId, Group, Data});
MgrId ->
ets:insert(?ETS_TABLE, {ResId, Group, Data});
_ ->
?SLOG(error, #{
msg => get_resource_owner_failed,
resource_id => ResId,
action => quit_resource
}),
self() ! quit
end.
read_cache(ResId) -> read_cache(ResId) ->
case ets:lookup(?ETS_TABLE, ResId) of gproc:lookup_value(?NAME(ResId)).
[{_Id, Group, Data}] -> {Group, Data};
[] -> not_found
end.
delete_cache(ResId, MgrId) -> erase_cache(_Data = #data{id = ResId}) ->
case get_owner(ResId) of gproc:unreg(?NAME(ResId)).
MgrIdNow when MgrIdNow == not_found; MgrIdNow == MgrId ->
do_delete_cache(ResId);
_ ->
ok
end.
do_delete_cache(<<?TEST_ID_PREFIX, _/binary>> = ResId) -> try_read_cache(ResId) ->
true = ets:delete(?ETS_TABLE, {owner, ResId}), try
true = ets:delete(?ETS_TABLE, ResId), read_cache(ResId)
ok; catch
do_delete_cache(ResId) -> error:badarg -> not_found
true = ets:delete(?ETS_TABLE, ResId),
ok.
set_new_owner(ResId) ->
MgrId = make_manager_id(ResId),
ok = set_owner(ResId, MgrId),
MgrId.
set_owner(ResId, MgrId) ->
ets:insert(?ETS_TABLE, {{owner, ResId}, MgrId}),
ok.
get_owner(ResId) ->
case ets:lookup(?ETS_TABLE, {owner, ResId}) of
[{_, MgrId}] -> MgrId;
[] -> not_found
end. end.
retry_actions(Data) -> retry_actions(Data) ->
@ -494,17 +440,17 @@ health_check_actions(Data) ->
handle_remove_event(From, ClearMetrics, Data) -> handle_remove_event(From, ClearMetrics, Data) ->
_ = stop_resource(Data), _ = stop_resource(Data),
ok = delete_cache(Data#data.id, Data#data.manager_id),
ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
case ClearMetrics of case ClearMetrics of
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
false -> ok false -> ok
end, end,
_ = erase_cache(Data),
{stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}. {stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}.
start_resource(Data, From) -> start_resource(Data, From) ->
%% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of
{ok, ResourceState} -> {ok, ResourceState} ->
UpdatedData = Data#data{status = connecting, state = ResourceState}, UpdatedData = Data#data{status = connecting, state = ResourceState},
%% Perform an initial health_check immediately before transitioning into a connected state %% Perform an initial health_check immediately before transitioning into a connected state
@ -535,7 +481,7 @@ stop_resource(#data{state = ResState, id = ResId} = Data) ->
%% is returned. %% is returned.
case ResState /= undefined of case ResState /= undefined of
true -> true ->
emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, ResState); emqx_resource:call_stop(Data#data.id, Data#data.mod, ResState);
false -> false ->
ok ok
end, end,
@ -589,7 +535,7 @@ with_health_check(#data{state = undefined} = Data, Func) ->
Func(disconnected, Data); Func(disconnected, Data);
with_health_check(#data{error = PrevError} = Data, Func) -> with_health_check(#data{error = PrevError} = Data, Func) ->
ResId = Data#data.id, ResId = Data#data.id,
HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state), HCRes = emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state),
{Status, NewState, Err} = parse_health_check_result(HCRes, Data), {Status, NewState, Err} = parse_health_check_result(HCRes, Data),
_ = maybe_alarm(Status, ResId, Err, PrevError), _ = maybe_alarm(Status, ResId, Err, PrevError),
ok = maybe_resume_resource_workers(ResId, Status), ok = maybe_resume_resource_workers(ResId, Status),
@ -604,7 +550,7 @@ update_state(Data) ->
update_state(DataWas, DataWas) -> update_state(DataWas, DataWas) ->
DataWas; DataWas;
update_state(Data, _DataWas) -> update_state(Data, _DataWas) ->
_ = insert_cache(Data#data.id, Data#data.group, Data), _ = insert_cache(Data#data.id, Data),
Data. Data.
health_check_interval(Opts) -> health_check_interval(Opts) ->
@ -694,10 +640,10 @@ wait_for_ready(ResId, WaitTime) ->
do_wait_for_ready(_ResId, 0) -> do_wait_for_ready(_ResId, 0) ->
timeout; timeout;
do_wait_for_ready(ResId, Retry) -> do_wait_for_ready(ResId, Retry) ->
case read_cache(ResId) of case try_read_cache(ResId) of
{_Group, #data{status = connected}} -> #data{status = connected} ->
ok; ok;
{_Group, #data{status = disconnected, error = Err}} -> #data{status = disconnected, error = Err} ->
{error, external_error(Err)}; {error, external_error(Err)};
_ -> _ ->
timer:sleep(?WAIT_FOR_RESOURCE_DELAY), timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
@ -706,12 +652,7 @@ do_wait_for_ready(ResId, Retry) ->
safe_call(ResId, Message, Timeout) -> safe_call(ResId, Message, Timeout) ->
try try
case read_cache(ResId) of gen_statem:call(?REF(ResId), Message, {clean_timeout, Timeout})
not_found ->
{error, not_found};
{_, #data{pid = ManagerPid}} ->
gen_statem:call(ManagerPid, Message, {clean_timeout, Timeout})
end
catch catch
error:badarg -> error:badarg ->
{error, not_found}; {error, not_found};

View File

@ -17,23 +17,20 @@
-behaviour(supervisor). -behaviour(supervisor).
-export([ensure_child/6]). -export([ensure_child/5]).
-export([start_link/0]). -export([start_link/0]).
-export([init/1]). -export([init/1]).
ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts) -> ensure_child(ResId, Group, ResourceType, Config, Opts) ->
_ = supervisor:start_child(?MODULE, [MgrId, ResId, Group, ResourceType, Config, Opts]), _ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]),
ok. ok.
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
TabOpts = [named_table, set, public, {read_concurrency, true}],
_ = ets:new(emqx_resource_manager, TabOpts),
ChildSpecs = [ ChildSpecs = [
#{ #{
id => emqx_resource_manager, id => emqx_resource_manager,
@ -44,6 +41,5 @@ init([]) ->
modules => [emqx_resource_manager] modules => [emqx_resource_manager]
} }
], ],
SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10}, SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10},
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.

View File

@ -1055,28 +1055,22 @@ t_list_filter(_) ->
). ).
t_create_dry_run_local(_) -> t_create_dry_run_local(_) ->
ets:match_delete(emqx_resource_manager, {{owner, '$1'}, '_'}),
lists:foreach( lists:foreach(
fun(_) -> fun(_) ->
create_dry_run_local_succ() create_dry_run_local_succ()
end, end,
lists:seq(1, 10) lists:seq(1, 10)
), ),
case [] =:= ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}) of ?retry(
false -> 100,
%% Sleep to remove flakyness in test case. It take some time for 5,
%% the ETS table to be cleared. ?assertEqual(
timer:sleep(2000), [],
[] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}); emqx_resource:list_instances_verbose()
true -> )
ok ).
end.
create_dry_run_local_succ() -> create_dry_run_local_succ() ->
case whereis(test_resource) of
undefined -> ok;
Pid -> exit(Pid, kill)
end,
?assertEqual( ?assertEqual(
ok, ok,
emqx_resource:create_dry_run_local( emqx_resource:create_dry_run_local(
@ -1107,7 +1101,15 @@ t_create_dry_run_local_failed(_) ->
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource, stop_error => true} #{name => test_resource, stop_error => true}
), ),
?assertEqual(ok, Res3). ?assertEqual(ok, Res3),
?retry(
100,
5,
?assertEqual(
[],
emqx_resource:list_instances_verbose()
)
).
t_test_func(_) -> t_test_func(_) ->
?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])), ?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),

View File

@ -502,11 +502,6 @@ resource_id(Config) ->
Name = ?config(influxdb_name, Config), Name = ?config(influxdb_name, Config),
emqx_bridge_resource:resource_id(Type, Name). emqx_bridge_resource:resource_id(Type, Name).
instance_id(Config) ->
ResourceId = resource_id(Config),
[{_, InstanceId}] = ets:lookup(emqx_resource_manager, {owner, ResourceId}),
InstanceId.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -581,14 +576,14 @@ t_start_already_started(Config) ->
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
InstanceId = instance_id(Config), ResourceId = resource_id(Config),
TypeAtom = binary_to_atom(Type), TypeAtom = binary_to_atom(Type),
NameAtom = binary_to_atom(Name), NameAtom = binary_to_atom(Name),
{ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check( {ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check(
emqx_bridge_schema, InfluxDBConfigString emqx_bridge_schema, InfluxDBConfigString
), ),
?check_trace( ?check_trace(
emqx_ee_connector_influxdb:on_start(InstanceId, InfluxDBConfigMap), emqx_ee_connector_influxdb:on_start(ResourceId, InfluxDBConfigMap),
fun(Result, Trace) -> fun(Result, Trace) ->
?assertMatch({ok, _}, Result), ?assertMatch({ok, _}, Result),
?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)), ?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)),

View File

@ -1,6 +1,6 @@
{application, emqx_ee_connector, [ {application, emqx_ee_connector, [
{description, "EMQX Enterprise connectors"}, {description, "EMQX Enterprise connectors"},
{vsn, "0.1.11"}, {vsn, "0.1.12"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -267,9 +267,8 @@ apply_template([{Key, _} | _] = Reqs, Templates) ->
[emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs] [emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs]
end. end.
client_id(InstanceId) -> client_id(ResourceId) ->
Name = emqx_resource_manager:manager_id_to_resource_id(InstanceId), erlang:binary_to_atom(ResourceId, utf8).
erlang:binary_to_atom(Name, utf8).
redact(Msg) -> redact(Msg) ->
emqx_utils:redact(Msg, fun is_sensitive_key/1). emqx_utils:redact(Msg, fun is_sensitive_key/1).

View File

@ -174,7 +174,7 @@ callback_mode() -> async_if_possible.
is_buffer_supported() -> false. is_buffer_supported() -> false.
on_start( on_start(
InstanceId = PoolName, ResourceId = PoolName,
#{ #{
server := Server, server := Server,
username := Username, username := Username,
@ -187,7 +187,7 @@ on_start(
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_sqlserver_connector", msg => "starting_sqlserver_connector",
connector => InstanceId, connector => ResourceId,
config => emqx_utils:redact(Config) config => emqx_utils:redact(Config)
}), }),
@ -212,7 +212,7 @@ on_start(
], ],
State = #{ State = #{
%% also InstanceId %% also ResourceId
pool_name => PoolName, pool_name => PoolName,
sql_templates => parse_sql_template(Config), sql_templates => parse_sql_template(Config),
resource_opts => ResourceOpts resource_opts => ResourceOpts
@ -228,15 +228,15 @@ on_start(
{error, Reason} {error, Reason}
end. end.
on_stop(InstanceId, #{pool_name := PoolName} = _State) -> on_stop(ResourceId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_sqlserver_connector", msg => "stopping_sqlserver_connector",
connector => InstanceId connector => ResourceId
}), }),
emqx_resource_pool:stop(PoolName). emqx_resource_pool:stop(ResourceId).
-spec on_query( -spec on_query(
manager_id(), resource_id(),
{?ACTION_SEND_MESSAGE, map()}, {?ACTION_SEND_MESSAGE, map()},
state() state()
) -> ) ->
@ -244,16 +244,16 @@ on_stop(InstanceId, #{pool_name := PoolName} = _State) ->
| {ok, list()} | {ok, list()}
| {error, {recoverable_error, term()}} | {error, {recoverable_error, term()}}
| {error, term()}. | {error, term()}.
on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> on_query(ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
?TRACE( ?TRACE(
"SINGLE_QUERY_SYNC", "SINGLE_QUERY_SYNC",
"bridge_sqlserver_received", "bridge_sqlserver_received",
#{requests => Query, connector => InstanceId, state => State} #{requests => Query, connector => ResourceId, state => State}
), ),
do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State). do_query(ResourceId, Query, ?SYNC_QUERY_MODE, State).
-spec on_query_async( -spec on_query_async(
manager_id(), resource_id(),
{?ACTION_SEND_MESSAGE, map()}, {?ACTION_SEND_MESSAGE, map()},
{ReplyFun :: function(), Args :: list()}, {ReplyFun :: function(), Args :: list()},
state() state()
@ -261,7 +261,7 @@ on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
{ok, any()} {ok, any()}
| {error, term()}. | {error, term()}.
on_query_async( on_query_async(
InstanceId, ResourceId,
{?ACTION_SEND_MESSAGE, _Msg} = Query, {?ACTION_SEND_MESSAGE, _Msg} = Query,
ReplyFunAndArgs, ReplyFunAndArgs,
State State
@ -269,12 +269,12 @@ on_query_async(
?TRACE( ?TRACE(
"SINGLE_QUERY_ASYNC", "SINGLE_QUERY_ASYNC",
"bridge_sqlserver_received", "bridge_sqlserver_received",
#{requests => Query, connector => InstanceId, state => State} #{requests => Query, connector => ResourceId, state => State}
), ),
do_query(InstanceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). do_query(ResourceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
-spec on_batch_query( -spec on_batch_query(
manager_id(), resource_id(),
[{?ACTION_SEND_MESSAGE, map()}], [{?ACTION_SEND_MESSAGE, map()}],
state() state()
) -> ) ->
@ -282,29 +282,29 @@ on_query_async(
| {ok, list()} | {ok, list()}
| {error, {recoverable_error, term()}} | {error, {recoverable_error, term()}}
| {error, term()}. | {error, term()}.
on_batch_query(InstanceId, BatchRequests, State) -> on_batch_query(ResourceId, BatchRequests, State) ->
?TRACE( ?TRACE(
"BATCH_QUERY_SYNC", "BATCH_QUERY_SYNC",
"bridge_sqlserver_received", "bridge_sqlserver_received",
#{requests => BatchRequests, connector => InstanceId, state => State} #{requests => BatchRequests, connector => ResourceId, state => State}
), ),
do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State). do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State).
-spec on_batch_query_async( -spec on_batch_query_async(
manager_id(), resource_id(),
[{?ACTION_SEND_MESSAGE, map()}], [{?ACTION_SEND_MESSAGE, map()}],
{ReplyFun :: function(), Args :: list()}, {ReplyFun :: function(), Args :: list()},
state() state()
) -> {ok, any()}. ) -> {ok, any()}.
on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) -> on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
?TRACE( ?TRACE(
"BATCH_QUERY_ASYNC", "BATCH_QUERY_ASYNC",
"bridge_sqlserver_received", "bridge_sqlserver_received",
#{requests => Requests, connector => InstanceId, state => State} #{requests => Requests, connector => ResourceId, state => State}
), ),
do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). do_query(ResourceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
on_get_status(_InstanceId, #{pool_name := PoolName} = _State) -> on_get_status(_ResourceId, #{pool_name := PoolName} = _State) ->
Health = emqx_resource_pool:health_check_workers( Health = emqx_resource_pool:health_check_workers(
PoolName, PoolName,
{?MODULE, do_get_status, []} {?MODULE, do_get_status, []}
@ -366,7 +366,7 @@ conn_str([{_, _} | Opts], Acc) ->
%% Sync & Async query with singe & batch sql statement %% Sync & Async query with singe & batch sql statement
-spec do_query( -spec do_query(
manager_id(), resource_id(),
Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}], Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}],
ApplyMode :: ApplyMode ::
handover handover
@ -377,7 +377,7 @@ conn_str([{_, _} | Opts], Acc) ->
| {error, {recoverable_error, term()}} | {error, {recoverable_error, term()}}
| {error, term()}. | {error, term()}.
do_query( do_query(
InstanceId, ResourceId,
Query, Query,
ApplyMode, ApplyMode,
#{pool_name := PoolName, sql_templates := Templates} = State #{pool_name := PoolName, sql_templates := Templates} = State
@ -385,7 +385,7 @@ do_query(
?TRACE( ?TRACE(
"SINGLE_QUERY_SYNC", "SINGLE_QUERY_SYNC",
"sqlserver_connector_received", "sqlserver_connector_received",
#{query => Query, connector => InstanceId, state => State} #{query => Query, connector => ResourceId, state => State}
), ),
%% only insert sql statement for single query and batch query %% only insert sql statement for single query and batch query
@ -409,7 +409,7 @@ do_query(
), ),
?SLOG(error, #{ ?SLOG(error, #{
msg => "sqlserver_connector_do_query_failed", msg => "sqlserver_connector_do_query_failed",
connector => InstanceId, connector => ResourceId,
query => Query, query => Query,
reason => Reason reason => Reason
}), }),
@ -423,9 +423,9 @@ do_query(
end. end.
worker_do_insert( worker_do_insert(
Conn, SQL, #{resource_opts := ResourceOpts, pool_name := InstanceId} = State Conn, SQL, #{resource_opts := ResourceOpts, pool_name := ResourceId} = State
) -> ) ->
LogMeta = #{connector => InstanceId, state => State}, LogMeta = #{connector => ResourceId, state => State},
try try
case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of
{selected, Rows, _} -> {selected, Rows, _} ->