Merge pull request #13449 from zhongwencool/resource-log

feat: add group/type to resource slog
This commit is contained in:
zhongwencool 2024-07-24 14:34:25 +08:00 committed by GitHub
commit c7a7658c7a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
88 changed files with 419 additions and 208 deletions

View File

@ -30,6 +30,6 @@
-type authenticator_id() :: binary(). -type authenticator_id() :: binary().
-define(AUTHN_RESOURCE_GROUP, <<"emqx_authn">>). -define(AUTHN_RESOURCE_GROUP, <<"authn">>).
-endif. -endif.

View File

@ -158,7 +158,7 @@
count => 1 count => 1
}). }).
-define(AUTHZ_RESOURCE_GROUP, <<"emqx_authz">>). -define(AUTHZ_RESOURCE_GROUP, <<"authz">>).
-define(AUTHZ_FEATURES, [rich_actions]). -define(AUTHZ_FEATURES, [rich_actions]).

View File

@ -66,7 +66,11 @@ description() ->
create(Config) -> create(Config) ->
NConfig = parse_config(Config), NConfig = parse_config(Config),
ResourceId = emqx_authn_utils:make_resource_id(?MODULE), ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
{ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_bridge_http_connector, NConfig), {ok, _Data} = emqx_authz_utils:create_resource(
ResourceId,
emqx_bridge_http_connector,
NConfig
),
NConfig#{annotations => #{id => ResourceId}}. NConfig#{annotations => #{id => ResourceId}}.
update(Config) -> update(Config) ->

View File

@ -22,6 +22,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -32,6 +33,8 @@
-define(DEFAULT_POOL_SIZE, 8). -define(DEFAULT_POOL_SIZE, 8).
resource_type() -> jwks.
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start(InstId, Opts) -> on_start(InstId, Opts) ->

View File

@ -186,7 +186,8 @@ do_create(
ResourceId, ResourceId,
?AUTHN_RESOURCE_GROUP, ?AUTHN_RESOURCE_GROUP,
emqx_authn_jwks_connector, emqx_authn_jwks_connector,
connector_opts(Config) connector_opts(Config),
#{}
), ),
{ok, #{ {ok, #{
jwk_resource => ResourceId, jwk_resource => ResourceId,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_auth_ldap, [ {application, emqx_auth_ldap, [
{description, "EMQX LDAP Authentication and Authorization"}, {description, "EMQX LDAP Authentication and Authorization"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, []}, {registered, []},
{mod, {emqx_auth_ldap_app, []}}, {mod, {emqx_auth_ldap_app, []}},
{applications, [ {applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_auth_mongodb, [ {application, emqx_auth_mongodb, [
{description, "EMQX MongoDB Authentication and Authorization"}, {description, "EMQX MongoDB Authentication and Authorization"},
{vsn, "0.2.0"}, {vsn, "0.2.1"},
{registered, []}, {registered, []},
{mod, {emqx_auth_mongodb_app, []}}, {mod, {emqx_auth_mongodb_app, []}},
{applications, [ {applications, [

View File

@ -198,9 +198,9 @@ test_user_auth(#{
t_authenticate_disabled_prepared_statements(_Config) -> t_authenticate_disabled_prepared_statements(_Config) ->
ResConfig = maps:merge(pgsql_config(), #{disable_prepared_statements => true}), ResConfig = maps:merge(pgsql_config(), #{disable_prepared_statements => true}),
{ok, _} = emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, ResConfig), {ok, _} = emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, ResConfig, #{}),
on_exit(fun() -> on_exit(fun() ->
emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, pgsql_config()) emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, pgsql_config(), #{})
end), end),
ok = lists:foreach( ok = lists:foreach(
fun(Sample0) -> fun(Sample0) ->

View File

@ -196,7 +196,7 @@ create(Type, Name, Conf0, Opts) ->
Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name}, Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name},
{ok, _Data} = emqx_resource:create_local( {ok, _Data} = emqx_resource:create_local(
resource_id(Type, Name), resource_id(Type, Name),
<<"emqx_bridge">>, <<"bridge">>,
bridge_to_resource_type(Type), bridge_to_resource_type(Type),
parse_confs(TypeBin, Name, Conf), parse_confs(TypeBin, Name, Conf),
parse_opts(Conf, Opts) parse_opts(Conf, Opts)
@ -282,7 +282,7 @@ create_dry_run(Type0, Conf0) ->
create_dry_run_bridge_v1(Type, Conf0) -> create_dry_run_bridge_v1(Type, Conf0) ->
TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
TmpPath = emqx_utils:safe_filename(TmpName), TmpPath = emqx_utils:safe_filename(TmpName),
%% Already typechecked, no need to catch errors %% Already type checked, no need to catch errors
TypeBin = bin(Type), TypeBin = bin(Type),
TypeAtom = safe_atom(Type), TypeAtom = safe_atom(Type),
Conf1 = maps:without([<<"name">>], Conf0), Conf1 = maps:without([<<"name">>], Conf0),

View File

@ -1110,6 +1110,7 @@ t_query_uses_action_query_mode(_Config) ->
%% ... now we use a quite different query mode for the action %% ... now we use a quite different query mode for the action
meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer), meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer),
meck:expect(con_mod(), resource_type, 0, dummy),
meck:expect(con_mod(), callback_mode, 0, async_if_possible), meck:expect(con_mod(), callback_mode, 0, async_if_possible),
{ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),

View File

@ -293,6 +293,7 @@ init_mocks() ->
meck:new(emqx_connector_resource, [passthrough, no_link]), meck:new(emqx_connector_resource, [passthrough, no_link]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL),
meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]),
meck:expect(?CONNECTOR_IMPL, resource_type, 0, dummy),
meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible),
meck:expect( meck:expect(
?CONNECTOR_IMPL, ?CONNECTOR_IMPL,

View File

@ -15,15 +15,17 @@
%% this module is only intended to be mocked %% this module is only intended to be mocked
-module(emqx_bridge_v2_dummy_connector). -module(emqx_bridge_v2_dummy_connector).
-behavior(emqx_resource).
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_add_channel/4, on_add_channel/4,
on_get_channel_status/3 on_get_channel_status/3
]). ]).
resource_type() -> dummy.
callback_mode() -> error(unexpected). callback_mode() -> error(unexpected).
on_start(_, _) -> error(unexpected). on_start(_, _) -> error(unexpected).
on_stop(_, _) -> error(unexpected). on_stop(_, _) -> error(unexpected).

View File

@ -19,6 +19,7 @@
-export([ -export([
query_mode/1, query_mode/1,
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -34,6 +35,8 @@
query_mode(_Config) -> query_mode(_Config) ->
sync. sync.
resource_type() -> test_connector.
callback_mode() -> callback_mode() ->
always_sync. always_sync.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_cassandra, [ {application, emqx_bridge_cassandra, [
{description, "EMQX Enterprise Cassandra Bridge"}, {description, "EMQX Enterprise Cassandra Bridge"},
{vsn, "0.3.1"}, {vsn, "0.3.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -19,6 +19,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -94,6 +95,7 @@ desc("connector") ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% callbacks for emqx_resource %% callbacks for emqx_resource
resource_type() -> cassandra.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_clickhouse, [ {application, emqx_bridge_clickhouse, [
{description, "EMQX Enterprise ClickHouse Bridge"}, {description, "EMQX Enterprise ClickHouse Bridge"},
{vsn, "0.4.1"}, {vsn, "0.4.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -29,6 +29,7 @@
%% callbacks for behaviour emqx_resource %% callbacks for behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -128,6 +129,7 @@ values(_) ->
%% =================================================================== %% ===================================================================
%% Callbacks defined in emqx_resource %% Callbacks defined in emqx_resource
%% =================================================================== %% ===================================================================
resource_type() -> clickhouse.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_dynamo, [ {application, emqx_bridge_dynamo, [
{description, "EMQX Enterprise Dynamo Bridge"}, {description, "EMQX Enterprise Dynamo Bridge"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -17,6 +17,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -68,6 +69,7 @@ fields(config) ->
%%======================================================================================== %%========================================================================================
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
resource_type() -> dynamo.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_es, [ {application, emqx_bridge_es, [
{description, "EMQX Enterprise Elastic Search Bridge"}, {description, "EMQX Enterprise Elastic Search Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{modules, [ {modules, [
emqx_bridge_es, emqx_bridge_es,
emqx_bridge_es_connector emqx_bridge_es_connector

View File

@ -14,6 +14,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -207,6 +208,8 @@ base_url(#{server := Server}) -> "http://" ++ Server.
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
resource_type() -> elastic_search.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
-spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). -spec on_start(manager_id(), config()) -> {ok, state()} | no_return().

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [ {application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"}, {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.3.1"}, {vsn, "0.3.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -8,6 +8,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
query_mode/1, query_mode/1,
on_start/2, on_start/2,
@ -84,6 +85,8 @@
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
-spec resource_type() -> resource_type().
resource_type() -> gcp_pubsub_consumer.
-spec callback_mode() -> callback_mode(). -spec callback_mode() -> callback_mode().
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.

View File

@ -41,6 +41,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
query_mode/1, query_mode/1,
on_start/2, on_start/2,
@ -62,6 +63,7 @@
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
resource_type() -> gcp_pubsub.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.

View File

@ -16,6 +16,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -67,6 +68,8 @@
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% resource callback %% resource callback
resource_type() -> greptimedb.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
on_add_channel( on_add_channel(

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_hstreamdb, [ {application, emqx_bridge_hstreamdb, [
{description, "EMQX Enterprise HStreamDB Bridge"}, {description, "EMQX Enterprise HStreamDB Bridge"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -16,6 +16,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -44,6 +45,8 @@
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% resource callback %% resource callback
resource_type() -> hstreamdb.
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start(InstId, Config) -> on_start(InstId, Config) ->

View File

@ -26,6 +26,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -183,6 +184,7 @@ sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field). ref(Field) -> hoconsc:ref(?MODULE, Field).
%% =================================================================== %% ===================================================================
resource_type() -> webhook.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_influxdb, [ {application, emqx_bridge_influxdb, [
{description, "EMQX Enterprise InfluxDB Bridge"}, {description, "EMQX Enterprise InfluxDB Bridge"},
{vsn, "0.2.3"}, {vsn, "0.2.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -16,6 +16,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -70,6 +71,8 @@
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% resource callback %% resource callback
resource_type() -> influxdb.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
on_add_channel( on_add_channel(

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_iotdb, [ {application, emqx_bridge_iotdb, [
{description, "EMQX Enterprise Apache IoTDB Bridge"}, {description, "EMQX Enterprise Apache IoTDB Bridge"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{modules, [ {modules, [
emqx_bridge_iotdb, emqx_bridge_iotdb,
emqx_bridge_iotdb_connector emqx_bridge_iotdb_connector

View File

@ -15,6 +15,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -206,6 +207,8 @@ proplists_without(Keys, List) ->
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
resource_type() -> iotdb.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
-spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). -spec on_start(manager_id(), config()) -> {ok, state()} | no_return().

View File

@ -7,6 +7,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
query_mode/1, query_mode/1,
on_start/2, on_start/2,
@ -125,6 +126,7 @@
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
resource_type() -> kafka_consumer.
callback_mode() -> callback_mode() ->
async_if_possible. async_if_possible.
@ -628,16 +630,6 @@ consumer_group_id(BridgeName0) ->
BridgeName = to_bin(BridgeName0), BridgeName = to_bin(BridgeName0),
<<"emqx-kafka-consumer-", BridgeName/binary>>. <<"emqx-kafka-consumer-", BridgeName/binary>>.
-spec is_dry_run(connector_resource_id()) -> boolean().
is_dry_run(ConnectorResId) ->
TestIdStart = string:find(ConnectorResId, ?TEST_ID_PREFIX),
case TestIdStart of
nomatch ->
false;
_ ->
string:equal(TestIdStart, ConnectorResId)
end.
-spec check_client_connectivity(pid()) -> -spec check_client_connectivity(pid()) ->
?status_connected ?status_connected
| ?status_disconnected | ?status_disconnected
@ -673,7 +665,7 @@ maybe_clean_error(Reason) ->
-spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom(). -spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom().
make_client_id(ConnectorResId, BridgeType, BridgeName) -> make_client_id(ConnectorResId, BridgeType, BridgeName) ->
case is_dry_run(ConnectorResId) of case emqx_resource:is_dry_run(ConnectorResId) of
false -> false ->
ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
binary_to_atom(ClientID0); binary_to_atom(ClientID0);

View File

@ -10,6 +10,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
query_mode/1, query_mode/1,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
@ -35,6 +36,8 @@
-define(kafka_client_id, kafka_client_id). -define(kafka_client_id, kafka_client_id).
-define(kafka_producers, kafka_producers). -define(kafka_producers, kafka_producers).
resource_type() -> kafka_producer.
query_mode(#{parameters := #{query_mode := sync}}) -> query_mode(#{parameters := #{query_mode := sync}}) ->
simple_sync_internal_buffer; simple_sync_internal_buffer;
query_mode(_) -> query_mode(_) ->
@ -137,14 +140,7 @@ create_producers_for_bridge_v2(
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
#{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id),
TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX), IsDryRun = emqx_resource:is_dry_run(BridgeV2Id),
IsDryRun =
case TestIdStart of
nomatch ->
false;
_ ->
string:equal(TestIdStart, InstId)
end,
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
WolffProducerConfig = producers_config( WolffProducerConfig = producers_config(
BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_kinesis, [ {application, emqx_bridge_kinesis, [
{description, "EMQX Enterprise Amazon Kinesis Bridge"}, {description, "EMQX Enterprise Amazon Kinesis Bridge"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -30,6 +30,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -50,6 +51,7 @@
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
resource_type() -> kinesis_producer.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_mongodb, [ {application, emqx_bridge_mongodb, [
{description, "EMQX Enterprise MongoDB Bridge"}, {description, "EMQX Enterprise MongoDB Bridge"},
{vsn, "0.3.2"}, {vsn, "0.3.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -11,6 +11,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
on_remove_channel/3, on_remove_channel/3,
resource_type/0,
callback_mode/0, callback_mode/0,
on_add_channel/4, on_add_channel/4,
on_get_channel_status/3, on_get_channel_status/3,
@ -25,6 +26,7 @@
%%======================================================================================== %%========================================================================================
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
resource_type() -> emqx_mongodb:resource_type().
callback_mode() -> emqx_mongodb:callback_mode(). callback_mode() -> emqx_mongodb:callback_mode().

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [ {application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"}, {description, "EMQX MQTT Broker Bridge"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -30,6 +30,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -76,6 +77,8 @@ on_message_received(Msg, HookPoints, ResId) ->
ok. ok.
%% =================================================================== %% ===================================================================
resource_type() -> mqtt.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
on_start(ResourceId, #{server := Server} = Conf) -> on_start(ResourceId, #{server := Server} = Conf) ->
@ -207,7 +210,7 @@ start_mqtt_clients(ResourceId, Conf) ->
start_mqtt_clients(ResourceId, Conf, ClientOpts). start_mqtt_clients(ResourceId, Conf, ClientOpts).
start_mqtt_clients(ResourceId, StartConf, ClientOpts) -> start_mqtt_clients(ResourceId, StartConf, ClientOpts) ->
PoolName = <<ResourceId/binary>>, PoolName = ResourceId,
#{ #{
pool_size := PoolSize pool_size := PoolSize
} = StartConf, } = StartConf,
@ -227,7 +230,7 @@ start_mqtt_clients(ResourceId, StartConf, ClientOpts) ->
on_stop(ResourceId, State) -> on_stop(ResourceId, State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_mqtt_connector", msg => "stopping_mqtt_connector",
connector => ResourceId resource_id => ResourceId
}), }),
%% on_stop can be called with State = undefined %% on_stop can be called with State = undefined
StateMap = StateMap =
@ -271,7 +274,7 @@ on_query(
on_query(ResourceId, {_ChannelId, Msg}, #{}) -> on_query(ResourceId, {_ChannelId, Msg}, #{}) ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "forwarding_unavailable", msg => "forwarding_unavailable",
connector => ResourceId, resource_id => ResourceId,
message => Msg, message => Msg,
reason => "Egress is not configured" reason => "Egress is not configured"
}). }).
@ -298,7 +301,7 @@ on_query_async(
on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) -> on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "forwarding_unavailable", msg => "forwarding_unavailable",
connector => ResourceId, resource_id => ResourceId,
message => Msg, message => Msg,
reason => "Egress is not configured" reason => "Egress is not configured"
}). }).
@ -463,8 +466,10 @@ connect(Options) ->
{ok, Pid} -> {ok, Pid} ->
connect(Pid, Name); connect(Pid, Name);
{error, Reason} = Error -> {error, Reason} = Error ->
?SLOG(error, #{ IsDryRun = emqx_resource:is_dry_run(Name),
?SLOG(?LOG_LEVEL(IsDryRun), #{
msg => "client_start_failed", msg => "client_start_failed",
resource_id => Name,
config => emqx_utils:redact(ClientOpts), config => emqx_utils:redact(ClientOpts),
reason => Reason reason => Reason
}), }),
@ -508,10 +513,11 @@ connect(Pid, Name) ->
{ok, _Props} -> {ok, _Props} ->
{ok, Pid}; {ok, Pid};
{error, Reason} = Error -> {error, Reason} = Error ->
?SLOG(warning, #{ IsDryRun = emqx_resource:is_dry_run(Name),
?SLOG(?LOG_LEVEL(IsDryRun), #{
msg => "ingress_client_connect_failed", msg => "ingress_client_connect_failed",
reason => Reason, reason => Reason,
name => Name resource_id => Name
}), }),
_ = catch emqtt:stop(Pid), _ = catch emqtt:stop(Pid),
Error Error

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_mysql, [ {application, emqx_bridge_mysql, [
{description, "EMQX Enterprise MySQL Bridge"}, {description, "EMQX Enterprise MySQL Bridge"},
{vsn, "0.1.7"}, {vsn, "0.1.8"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -10,6 +10,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
on_remove_channel/3, on_remove_channel/3,
resource_type/0,
callback_mode/0, callback_mode/0,
on_add_channel/4, on_add_channel/4,
on_batch_query/3, on_batch_query/3,
@ -24,6 +25,7 @@
%%======================================================================================== %%========================================================================================
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
resource_type() -> emqx_mysql:resource_type().
callback_mode() -> emqx_mysql:callback_mode(). callback_mode() -> emqx_mysql:callback_mode().

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_opents, [ {application, emqx_bridge_opents, [
{description, "EMQX Enterprise OpenTSDB Bridge"}, {description, "EMQX Enterprise OpenTSDB Bridge"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -18,6 +18,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -114,6 +115,8 @@ connector_example_values() ->
-define(HTTP_CONNECT_TIMEOUT, 1000). -define(HTTP_CONNECT_TIMEOUT, 1000).
resource_type() -> opents.
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start( on_start(

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [ {application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"}, {description, "EMQX Pulsar Bridge"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -10,6 +10,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
query_mode/1, query_mode/1,
on_start/2, on_start/2,
@ -55,6 +56,7 @@
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
resource_type() -> pulsar.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
@ -255,7 +257,7 @@ format_servers(Servers0) ->
-spec make_client_id(resource_id()) -> pulsar_client_id(). -spec make_client_id(resource_id()) -> pulsar_client_id().
make_client_id(InstanceId) -> make_client_id(InstanceId) ->
case is_dry_run(InstanceId) of case emqx_resource:is_dry_run(InstanceId) of
true -> true ->
pulsar_producer_probe; pulsar_producer_probe;
false -> false ->
@ -269,14 +271,6 @@ make_client_id(InstanceId) ->
binary_to_atom(ClientIdBin) binary_to_atom(ClientIdBin)
end. end.
-spec is_dry_run(resource_id()) -> boolean().
is_dry_run(InstanceId) ->
TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX),
case TestIdStart of
nomatch -> false;
_ -> string:equal(TestIdStart, InstanceId)
end.
conn_opts(#{authentication := none}) -> conn_opts(#{authentication := none}) ->
#{}; #{};
conn_opts(#{authentication := #{username := Username, password := Password}}) -> conn_opts(#{authentication := #{username := Username, password := Password}}) ->
@ -297,7 +291,7 @@ replayq_dir(ClientId) ->
filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]). filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]).
producer_name(InstanceId, ChannelId) -> producer_name(InstanceId, ChannelId) ->
case is_dry_run(InstanceId) of case emqx_resource:is_dry_run(InstanceId) of
%% do not create more atom %% do not create more atom
true -> true ->
pulsar_producer_probe_worker; pulsar_producer_probe_worker;

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rabbitmq, [ {application, emqx_bridge_rabbitmq, [
{description, "EMQX Enterprise RabbitMQ Bridge"}, {description, "EMQX Enterprise RabbitMQ Bridge"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{mod, {emqx_bridge_rabbitmq_app, []}}, {mod, {emqx_bridge_rabbitmq_app, []}},
{applications, [ {applications, [

View File

@ -31,6 +31,7 @@
on_remove_channel/3, on_remove_channel/3,
on_get_channels/1, on_get_channels/1,
on_stop/2, on_stop/2,
resource_type/0,
callback_mode/0, callback_mode/0,
on_get_status/2, on_get_status/2,
on_get_channel_status/3, on_get_channel_status/3,
@ -60,6 +61,7 @@ fields(config) ->
%% =================================================================== %% ===================================================================
%% emqx_resource callback %% emqx_resource callback
resource_type() -> rabbitmq.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_redis, [ {application, emqx_bridge_redis, [
{description, "EMQX Enterprise Redis Bridge"}, {description, "EMQX Enterprise Redis Bridge"},
{vsn, "0.1.8"}, {vsn, "0.1.9"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -12,6 +12,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_add_channel/4, on_add_channel/4,
on_remove_channel/3, on_remove_channel/3,
@ -29,7 +30,9 @@
%% resource callbacks %% resource callbacks
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
callback_mode() -> always_sync. resource_type() -> emqx_redis:resource_type().
callback_mode() -> emqx_redis:callback_mode().
on_add_channel( on_add_channel(
_InstanceId, _InstanceId,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rocketmq, [ {application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"}, {description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]}, {applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, [ {env, [

View File

@ -16,6 +16,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -90,6 +91,8 @@ servers() ->
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
resource_type() -> rocketmq.
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start( on_start(

View File

@ -13,6 +13,7 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -92,6 +93,8 @@
-define(AGGREG_SUP, emqx_bridge_s3_sup). -define(AGGREG_SUP, emqx_bridge_s3_sup).
%% %%
-spec resource_type() -> resource_type().
resource_type() -> s3.
-spec callback_mode() -> callback_mode(). -spec callback_mode() -> callback_mode().
callback_mode() -> callback_mode() ->

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_sqlserver, [ {application, emqx_bridge_sqlserver, [
{description, "EMQX Enterprise SQL Server Bridge"}, {description, "EMQX Enterprise SQL Server Bridge"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, odbc]}, {applications, [kernel, stdlib, emqx_resource, odbc]},
{env, [ {env, [

View File

@ -30,6 +30,7 @@
%% callbacks for behaviour emqx_resource %% callbacks for behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -173,6 +174,7 @@ server() ->
%%==================================================================== %%====================================================================
%% Callbacks defined in emqx_resource %% Callbacks defined in emqx_resource
%%==================================================================== %%====================================================================
resource_type() -> sqlserver.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_syskeeper, [ {application, emqx_bridge_syskeeper, [
{description, "EMQX Enterprise Data bridge for Syskeeper"}, {description, "EMQX Enterprise Data bridge for Syskeeper"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -18,6 +18,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
query_mode/1, query_mode/1,
on_start/2, on_start/2,
@ -147,6 +148,7 @@ server() ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
resource_type() -> syskeeper.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -12,6 +12,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
query_mode/1, query_mode/1,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -40,6 +41,8 @@
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% emqx_resource %% emqx_resource
resource_type() ->
syskeeper_proxy_server.
query_mode(_) -> query_mode(_) ->
no_queries. no_queries.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_tdengine, [ {application, emqx_bridge_tdengine, [
{description, "EMQX Enterprise TDEngine Bridge"}, {description, "EMQX Enterprise TDEngine Bridge"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -19,6 +19,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -140,6 +141,7 @@ connector_example_values() ->
%%======================================================================================== %%========================================================================================
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
resource_type() -> tdengine.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -37,4 +37,4 @@
"The " ++ TYPE ++ " default port " ++ DEFAULT_PORT ++ " is used if `[:Port]` is not specified." "The " ++ TYPE ++ " default port " ++ DEFAULT_PORT ++ " is used if `[:Port]` is not specified."
). ).
-define(CONNECTOR_RESOURCE_GROUP, <<"emqx_connector">>). -define(CONNECTOR_RESOURCE_GROUP, <<"connector">>).

View File

@ -18,6 +18,7 @@
-include("../../emqx_bridge/include/emqx_bridge_resource.hrl"). -include("../../emqx_bridge/include/emqx_bridge_resource.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl").
-include("emqx_connector.hrl").
-export([ -export([
connector_to_resource_type/1, connector_to_resource_type/1,
@ -126,7 +127,7 @@ create(Type, Name, Conf0, Opts) ->
Conf = Conf0#{connector_type => TypeBin, connector_name => Name}, Conf = Conf0#{connector_type => TypeBin, connector_name => Name},
{ok, _Data} = emqx_resource:create_local( {ok, _Data} = emqx_resource:create_local(
ResourceId, ResourceId,
<<"emqx_connector">>, ?CONNECTOR_RESOURCE_GROUP,
?MODULE:connector_to_resource_type(Type), ?MODULE:connector_to_resource_type(Type),
parse_confs(TypeBin, Name, Conf), parse_confs(TypeBin, Name, Conf),
parse_opts(Conf, Opts) parse_opts(Conf, Opts)
@ -208,7 +209,7 @@ create_dry_run(Type, Conf) ->
create_dry_run(Type, Conf, fun(_) -> ok end). create_dry_run(Type, Conf, fun(_) -> ok end).
create_dry_run(Type, Conf0, Callback) -> create_dry_run(Type, Conf0, Callback) ->
%% Already typechecked, no need to catch errors %% Already type checked, no need to catch errors
TypeBin = bin(Type), TypeBin = bin(Type),
TypeAtom = safe_atom(Type), TypeAtom = safe_atom(Type),
%% We use a fixed name here to avoid creating an atom %% We use a fixed name here to avoid creating an atom

View File

@ -50,6 +50,7 @@ t_connector_lifecycle({init, Config}) ->
meck:new(emqx_connector_resource, [passthrough]), meck:new(emqx_connector_resource, [passthrough]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]), meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, resource_type, 0, dummy),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok), meck:expect(?CONNECTOR, on_stop, 2, ok),
@ -171,6 +172,7 @@ t_remove_fail({'init', Config}) ->
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]), meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, resource_type, 0, dummy),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_get_channels, 1, [{<<"my_channel">>, #{enable => true}}]), meck:expect(?CONNECTOR, on_get_channels, 1, [{<<"my_channel">>, #{enable => true}}]),
meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}), meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}),
@ -234,6 +236,7 @@ t_create_with_bad_name_direct_path({init, Config}) ->
meck:new(emqx_connector_resource, [passthrough]), meck:new(emqx_connector_resource, [passthrough]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]), meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, resource_type, 0, dummy),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok), meck:expect(?CONNECTOR, on_stop, 2, ok),
@ -265,6 +268,7 @@ t_create_with_bad_name_root_path({init, Config}) ->
meck:new(emqx_connector_resource, [passthrough]), meck:new(emqx_connector_resource, [passthrough]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]), meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, resource_type, 0, dummy),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok), meck:expect(?CONNECTOR, on_stop, 2, ok),
@ -299,6 +303,7 @@ t_no_buffer_workers({'init', Config}) ->
meck:new(emqx_connector_resource, [passthrough]), meck:new(emqx_connector_resource, [passthrough]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]), meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, resource_type, 0, dummy),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_get_channels, 1, []), meck:expect(?CONNECTOR, on_get_channels, 1, []),

View File

@ -224,6 +224,7 @@ init_mocks(_TestCase) ->
meck:new(emqx_connector_resource, [passthrough, no_link]), meck:new(emqx_connector_resource, [passthrough, no_link]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL),
meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]),
meck:expect(?CONNECTOR_IMPL, resource_type, 0, dummy),
meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible),
meck:expect( meck:expect(
?CONNECTOR_IMPL, ?CONNECTOR_IMPL,

View File

@ -15,8 +15,10 @@
%% this module is only intended to be mocked %% this module is only intended to be mocked
-module(emqx_connector_dummy_impl). -module(emqx_connector_dummy_impl).
-behavior(emqx_resource).
-export([ -export([
resource_type/0,
query_mode/1, query_mode/1,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
@ -25,6 +27,7 @@
on_get_channel_status/3 on_get_channel_status/3
]). ]).
resource_type() -> dummy.
query_mode(_) -> error(unexpected). query_mode(_) -> error(unexpected).
callback_mode() -> error(unexpected). callback_mode() -> error(unexpected).
on_start(_, _) -> error(unexpected). on_start(_, _) -> error(unexpected).

View File

@ -45,7 +45,7 @@
-define(MOD_TAB, emqx_dashboard_sso). -define(MOD_TAB, emqx_dashboard_sso).
-define(MOD_KEY_PATH, [dashboard, sso]). -define(MOD_KEY_PATH, [dashboard, sso]).
-define(MOD_KEY_PATH(Sub), [dashboard, sso, Sub]). -define(MOD_KEY_PATH(Sub), [dashboard, sso, Sub]).
-define(RESOURCE_GROUP, <<"emqx_dashboard_sso">>). -define(RESOURCE_GROUP, <<"dashboard_sso">>).
-define(NO_ERROR, <<>>). -define(NO_ERROR, <<>>).
-define(DEFAULT_RESOURCE_OPTS, #{ -define(DEFAULT_RESOURCE_OPTS, #{
start_after_created => false start_after_created => false

View File

@ -24,7 +24,7 @@
-define(MOD_TAB, emqx_dashboard_sso). -define(MOD_TAB, emqx_dashboard_sso).
-define(MOD_KEY_PATH, [dashboard, sso, ldap]). -define(MOD_KEY_PATH, [dashboard, sso, ldap]).
-define(RESOURCE_GROUP, <<"emqx_dashboard_sso">>). -define(RESOURCE_GROUP, <<"dashboard_sso">>).
-import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1, request_api/3]). -import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1, request_api/3]).

View File

@ -1,6 +1,6 @@
{application, emqx_ldap, [ {application, emqx_ldap, [
{description, "EMQX LDAP Connector"}, {description, "EMQX LDAP Connector"},
{vsn, "0.1.8"}, {vsn, "0.1.9"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -27,6 +27,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -129,6 +130,8 @@ ensure_username(Field) ->
emqx_connector_schema_lib:username(Field). emqx_connector_schema_lib:username(Field).
%% =================================================================== %% ===================================================================
resource_type() -> ldap.
callback_mode() -> always_sync. callback_mode() -> always_sync.
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.

View File

@ -1,6 +1,6 @@
{application, emqx_mongodb, [ {application, emqx_mongodb, [
{description, "EMQX MongoDB Connector"}, {description, "EMQX MongoDB Connector"},
{vsn, "0.1.6"}, {vsn, "0.1.7"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -26,6 +26,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -172,6 +173,7 @@ desc(_) ->
undefined. undefined.
%% =================================================================== %% ===================================================================
resource_type() -> mongodb.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -1,6 +1,6 @@
{application, emqx_mysql, [ {application, emqx_mysql, [
{description, "EMQX MySQL Database Connector"}, {description, "EMQX MySQL Database Connector"},
{vsn, "0.1.9"}, {vsn, "0.2.0"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -25,6 +25,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -91,6 +92,8 @@ server() ->
emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS). emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS).
%% =================================================================== %% ===================================================================
resource_type() -> mysql.
callback_mode() -> always_sync. callback_mode() -> always_sync.
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.

View File

@ -1,6 +1,6 @@
{application, emqx_oracle, [ {application, emqx_oracle, [
{description, "EMQX Enterprise Oracle Database Connector"}, {description, "EMQX Enterprise Oracle Database Connector"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -21,6 +21,7 @@
%% callbacks for behaviour emqx_resource %% callbacks for behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -67,6 +68,8 @@
batch_params_tokens := params_tokens() batch_params_tokens := params_tokens()
}. }.
resource_type() -> oracle.
% As ecpool is not monitoring the worker's PID when doing a handover_async, the % As ecpool is not monitoring the worker's PID when doing a handover_async, the
% request can be lost if worker crashes. Thus, it's better to force requests to % request can be lost if worker crashes. Thus, it's better to force requests to
% be sync for now. % be sync for now.

View File

@ -1,6 +1,6 @@
{application, emqx_postgresql, [ {application, emqx_postgresql, [
{description, "EMQX PostgreSQL Database Connector"}, {description, "EMQX PostgreSQL Database Connector"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -29,6 +29,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -120,6 +121,8 @@ adjust_fields(Fields) ->
). ).
%% =================================================================== %% ===================================================================
resource_type() -> pgsql.
callback_mode() -> always_sync. callback_mode() -> always_sync.
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.

View File

@ -28,6 +28,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -119,6 +120,8 @@ redis_type(Type) ->
desc => ?DESC(Type) desc => ?DESC(Type)
}}. }}.
resource_type() -> redis.
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start(InstId, Config0) -> on_start(InstId, Config0) ->

View File

@ -23,7 +23,8 @@
%% remind us of that. %% remind us of that.
-define(rm_status_stopped, stopped). -define(rm_status_stopped, stopped).
-type resource_type() :: module(). -type resource_type() :: atom().
-type resource_module() :: module().
-type resource_id() :: binary(). -type resource_id() :: binary().
-type channel_id() :: binary(). -type channel_id() :: binary().
-type raw_resource_config() :: binary() | raw_term_resource_config(). -type raw_resource_config() :: binary() | raw_term_resource_config().
@ -158,5 +159,12 @@
%% See `hocon_tconf` %% See `hocon_tconf`
-define(TEST_ID_PREFIX, "t_probe_"). -define(TEST_ID_PREFIX, "t_probe_").
-define(RES_METRICS, resource_metrics). -define(RES_METRICS, resource_metrics).
-define(LOG_LEVEL(_L_),
case _L_ of
true -> info;
false -> warning
end
).
-define(TAG, "RESOURCE").
-define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations). -define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations).

View File

@ -39,12 +39,10 @@
-export([ -export([
%% store the config and start the instance %% store the config and start the instance
create_local/4,
create_local/5, create_local/5,
create_dry_run_local/2, create_dry_run_local/2,
create_dry_run_local/3, create_dry_run_local/3,
create_dry_run_local/4, create_dry_run_local/4,
recreate_local/3,
recreate_local/4, recreate_local/4,
%% remove the config and stop the instance %% remove the config and stop the instance
remove_local/1, remove_local/1,
@ -98,6 +96,7 @@
-export([ -export([
%% get the callback mode of a specific module %% get the callback mode of a specific module
get_callback_mode/1, get_callback_mode/1,
get_resource_type/1,
%% start the instance %% start the instance
call_start/3, call_start/3,
%% verify if the resource is working normally %% verify if the resource is working normally
@ -140,6 +139,8 @@
validate_name/1 validate_name/1
]). ]).
-export([is_dry_run/1]).
-export_type([ -export_type([
query_mode/0, query_mode/0,
resource_id/0, resource_id/0,
@ -279,16 +280,10 @@ is_resource_mod(Module) ->
%% ================================================================================= %% =================================================================================
%% APIs for resource instances %% APIs for resource instances
%% ================================================================================= %% =================================================================================
-spec create_local(resource_id(), resource_group(), resource_type(), resource_config()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create_local(ResId, Group, ResourceType, Config) ->
create_local(ResId, Group, ResourceType, Config, #{}).
-spec create_local( -spec create_local(
resource_id(), resource_id(),
resource_group(), resource_group(),
resource_type(), resource_module(),
resource_config(), resource_config(),
creation_opts() creation_opts()
) -> ) ->
@ -296,7 +291,7 @@ create_local(ResId, Group, ResourceType, Config) ->
create_local(ResId, Group, ResourceType, Config, Opts) -> create_local(ResId, Group, ResourceType, Config, Opts) ->
emqx_resource_manager:ensure_resource(ResId, Group, ResourceType, Config, Opts). emqx_resource_manager:ensure_resource(ResId, Group, ResourceType, Config, Opts).
-spec create_dry_run_local(resource_type(), resource_config()) -> -spec create_dry_run_local(resource_module(), resource_config()) ->
ok | {error, Reason :: term()}. ok | {error, Reason :: term()}.
create_dry_run_local(ResourceType, Config) -> create_dry_run_local(ResourceType, Config) ->
emqx_resource_manager:create_dry_run(ResourceType, Config). emqx_resource_manager:create_dry_run(ResourceType, Config).
@ -304,19 +299,21 @@ create_dry_run_local(ResourceType, Config) ->
create_dry_run_local(ResId, ResourceType, Config) -> create_dry_run_local(ResId, ResourceType, Config) ->
emqx_resource_manager:create_dry_run(ResId, ResourceType, Config). emqx_resource_manager:create_dry_run(ResId, ResourceType, Config).
-spec create_dry_run_local(resource_id(), resource_type(), resource_config(), OnReadyCallback) -> -spec create_dry_run_local(
resource_id(),
resource_module(),
resource_config(),
OnReadyCallback
) ->
ok | {error, Reason :: term()} ok | {error, Reason :: term()}
when when
OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}). OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}).
create_dry_run_local(ResId, ResourceType, Config, OnReadyCallback) -> create_dry_run_local(ResId, ResourceType, Config, OnReadyCallback) ->
emqx_resource_manager:create_dry_run(ResId, ResourceType, Config, OnReadyCallback). emqx_resource_manager:create_dry_run(ResId, ResourceType, Config, OnReadyCallback).
-spec recreate_local(resource_id(), resource_type(), resource_config()) -> -spec recreate_local(
{ok, resource_data()} | {error, Reason :: term()}. resource_id(), resource_module(), resource_config(), creation_opts()
recreate_local(ResId, ResourceType, Config) -> ) ->
recreate_local(ResId, ResourceType, Config, #{}).
-spec recreate_local(resource_id(), resource_type(), resource_config(), creation_opts()) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
recreate_local(ResId, ResourceType, Config, Opts) -> recreate_local(ResId, ResourceType, Config, Opts) ->
emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts). emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts).
@ -330,11 +327,15 @@ remove_local(ResId) ->
ok; ok;
Error -> Error ->
%% Only log, the ResId worker is always removed in manager's remove action. %% Only log, the ResId worker is always removed in manager's remove action.
?SLOG(warning, #{ ?SLOG(
msg => "remove_local_resource_failed", warning,
error => Error, #{
resource_id => ResId msg => "remove_resource_failed",
}), error => Error,
resource_id => ResId
},
#{tag => ?TAG}
),
ok ok
end. end.
@ -487,6 +488,10 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
get_callback_mode(Mod) -> get_callback_mode(Mod) ->
Mod:callback_mode(). Mod:callback_mode().
-spec get_resource_type(module()) -> resource_type().
get_resource_type(Mod) ->
Mod:resource_type().
-spec call_start(resource_id(), module(), resource_config()) -> -spec call_start(resource_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}. {ok, resource_state()} | {error, Reason :: term()}.
call_start(ResId, Mod, Config) -> call_start(ResId, Mod, Config) ->
@ -599,7 +604,7 @@ query_mode(Mod, Config, Opts) ->
maps:get(query_mode, Opts, sync) maps:get(query_mode, Opts, sync)
end. end.
-spec check_config(resource_type(), raw_resource_config()) -> -spec check_config(resource_module(), raw_resource_config()) ->
{ok, resource_config()} | {error, term()}. {ok, resource_config()} | {error, term()}.
check_config(ResourceType, Conf) -> check_config(ResourceType, Conf) ->
emqx_hocon:check(ResourceType, Conf). emqx_hocon:check(ResourceType, Conf).
@ -607,7 +612,7 @@ check_config(ResourceType, Conf) ->
-spec check_and_create_local( -spec check_and_create_local(
resource_id(), resource_id(),
resource_group(), resource_group(),
resource_type(), resource_module(),
raw_resource_config() raw_resource_config()
) -> ) ->
{ok, resource_data()} | {error, term()}. {ok, resource_data()} | {error, term()}.
@ -617,7 +622,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig) ->
-spec check_and_create_local( -spec check_and_create_local(
resource_id(), resource_id(),
resource_group(), resource_group(),
resource_type(), resource_module(),
raw_resource_config(), raw_resource_config(),
creation_opts() creation_opts()
) -> {ok, resource_data()} | {error, term()}. ) -> {ok, resource_data()} | {error, term()}.
@ -630,7 +635,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) ->
-spec check_and_recreate_local( -spec check_and_recreate_local(
resource_id(), resource_id(),
resource_type(), resource_module(),
raw_resource_config(), raw_resource_config(),
creation_opts() creation_opts()
) -> ) ->
@ -769,6 +774,13 @@ validate_name(Name) ->
_ = validate_name(Name, #{atom_name => false}), _ = validate_name(Name, #{atom_name => false}),
ok. ok.
-spec is_dry_run(resource_id()) -> boolean().
is_dry_run(ResId) ->
case string:find(ResId, ?TEST_ID_PREFIX) of
nomatch -> false;
TestIdStart -> string:equal(TestIdStart, ResId)
end.
validate_name(<<>>, _Opts) -> validate_name(<<>>, _Opts) ->
invalid_data("Name cannot be empty string"); invalid_data("Name cannot be empty string");
validate_name(Name, _Opts) when size(Name) >= 255 -> validate_name(Name, _Opts) when size(Name) >= 255 ->

View File

@ -75,6 +75,7 @@
-record(data, { -record(data, {
id, id,
group, group,
type,
mod, mod,
callback_mode, callback_mode,
query_mode, query_mode,
@ -163,7 +164,7 @@
-spec ensure_resource( -spec ensure_resource(
resource_id(), resource_id(),
resource_group(), resource_group(),
resource_type(), resource_module(),
resource_config(), resource_config(),
creation_opts() creation_opts()
) -> {ok, resource_data()}. ) -> {ok, resource_data()}.
@ -176,7 +177,9 @@ ensure_resource(ResId, Group, ResourceType, Config, Opts) ->
end. end.
%% @doc Called from emqx_resource when recreating a resource which may or may not exist %% @doc Called from emqx_resource when recreating a resource which may or may not exist
-spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) -> -spec recreate(
resource_id(), resource_module(), resource_config(), creation_opts()
) ->
{ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}. {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}.
recreate(ResId, ResourceType, NewConfig, Opts) -> recreate(ResId, ResourceType, NewConfig, Opts) ->
case lookup(ResId) of case lookup(ResId) of
@ -219,8 +222,8 @@ create(ResId, Group, ResourceType, Config, Opts) ->
%% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.
%% %%
%% Triggers the `emqx_resource_manager_sup` supervisor to actually create %% Triggers the `emqx_resource_manager_sup` supervisor to actually create
%% and link the process itself if not already started, and then immedately stops. %% and link the process itself if not already started, and then immediately stops.
-spec create_dry_run(resource_type(), resource_config()) -> -spec create_dry_run(resource_module(), resource_config()) ->
ok | {error, Reason :: term()}. ok | {error, Reason :: term()}.
create_dry_run(ResourceType, Config) -> create_dry_run(ResourceType, Config) ->
ResId = make_test_id(), ResId = make_test_id(),
@ -232,7 +235,9 @@ create_dry_run(ResId, ResourceType, Config) ->
do_nothing_on_ready(_ResId) -> do_nothing_on_ready(_ResId) ->
ok. ok.
-spec create_dry_run(resource_id(), resource_type(), resource_config(), OnReadyCallback) -> -spec create_dry_run(
resource_id(), resource_module(), resource_config(), OnReadyCallback
) ->
ok | {error, Reason :: term()} ok | {error, Reason :: term()}
when when
OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}). OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}).
@ -242,7 +247,9 @@ create_dry_run(ResId, ResourceType, Config, OnReadyCallback) ->
true -> maps:get(resource_opts, Config, #{}); true -> maps:get(resource_opts, Config, #{});
false -> #{} false -> #{}
end, end,
ok = emqx_resource_manager_sup:ensure_child(ResId, <<"dry_run">>, ResourceType, Config, Opts), ok = emqx_resource_manager_sup:ensure_child(
ResId, <<"dry_run">>, ResourceType, Config, Opts
),
HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
Timeout = emqx_utils:clamp(HealthCheckInterval, 5_000, 60_000), Timeout = emqx_utils:clamp(HealthCheckInterval, 5_000, 60_000),
case wait_for_ready(ResId, Timeout) of case wait_for_ready(ResId, Timeout) of
@ -499,6 +506,7 @@ start_link(ResId, Group, ResourceType, Config, Opts) ->
), ),
Data = #data{ Data = #data{
id = ResId, id = ResId,
type = emqx_resource:get_resource_type(ResourceType),
group = Group, group = Group,
mod = ResourceType, mod = ResourceType,
callback_mode = emqx_resource:get_callback_mode(ResourceType), callback_mode = emqx_resource:get_callback_mode(ResourceType),
@ -683,11 +691,13 @@ handle_event(EventType, EventData, State, Data) ->
error, error,
#{ #{
msg => "ignore_all_other_events", msg => "ignore_all_other_events",
resource_id => Data#data.id,
event_type => EventType, event_type => EventType,
event_data => EventData, event_data => EventData,
state => State, state => State,
data => emqx_utils:redact(Data) data => emqx_utils:redact(Data)
} },
#{tag => tag(Data#data.group, Data#data.type)}
), ),
keep_state_and_data. keep_state_and_data.
@ -752,7 +762,8 @@ handle_remove_event(From, ClearMetrics, Data) ->
start_resource(Data, From) -> start_resource(Data, From) ->
%% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of #data{id = ResId, mod = Mod, config = Config, group = Group, type = Type} = Data,
case emqx_resource:call_start(ResId, Mod, Config) of
{ok, ResourceState} -> {ok, ResourceState} ->
UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState}, UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState},
%% Perform an initial health_check immediately before transitioning into a connected state %% Perform an initial health_check immediately before transitioning into a connected state
@ -760,12 +771,17 @@ start_resource(Data, From) ->
Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
{next_state, ?state_connecting, update_state(UpdatedData2, Data), Actions}; {next_state, ?state_connecting, update_state(UpdatedData2, Data), Actions};
{error, Reason} = Err -> {error, Reason} = Err ->
?SLOG(warning, #{ IsDryRun = emqx_resource:is_dry_run(ResId),
msg => "start_resource_failed", ?SLOG(
id => Data#data.id, log_level(IsDryRun),
reason => Reason #{
}), msg => "start_resource_failed",
_ = maybe_alarm(?status_disconnected, Data#data.id, Err, Data#data.error), resource_id => ResId,
reason => Reason
},
#{tag => tag(Group, Type)}
),
_ = maybe_alarm(?status_disconnected, IsDryRun, ResId, Err, Data#data.error),
%% Add channels and raise alarms %% Add channels and raise alarms
NewData1 = channels_health_check(?status_disconnected, add_channels(Data)), NewData1 = channels_health_check(?status_disconnected, add_channels(Data)),
%% Keep track of the error reason why the connection did not work %% Keep track of the error reason why the connection did not work
@ -796,13 +812,20 @@ add_channels(Data) ->
add_channels_in_list([], Data) -> add_channels_in_list([], Data) ->
Data; Data;
add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
#data{
id = ResId,
mod = Mod,
state = State,
added_channels = AddedChannelsMap,
group = Group,
type = Type
} = Data,
case case
emqx_resource:call_add_channel( emqx_resource:call_add_channel(
Data#data.id, Data#data.mod, Data#data.state, ChannelID, ChannelConfig ResId, Mod, State, ChannelID, ChannelConfig
) )
of of
{ok, NewState} -> {ok, NewState} ->
AddedChannelsMap = Data#data.added_channels,
%% Set the channel status to connecting to indicate that %% Set the channel status to connecting to indicate that
%% we have not yet performed the initial health_check %% we have not yet performed the initial health_check
NewAddedChannelsMap = maps:put( NewAddedChannelsMap = maps:put(
@ -816,12 +839,17 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
}, },
add_channels_in_list(Rest, NewData); add_channels_in_list(Rest, NewData);
{error, Reason} = Error -> {error, Reason} = Error ->
?SLOG(warning, #{ IsDryRun = emqx_resource:is_dry_run(ResId),
msg => add_channel_failed, ?SLOG(
id => Data#data.id, log_level(IsDryRun),
channel_id => ChannelID, #{
reason => Reason msg => "add_channel_failed",
}), resource_id => ResId,
channel_id => ChannelID,
reason => Reason
},
#{tag => tag(Group, Type)}
),
AddedChannelsMap = Data#data.added_channels, AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap = maps:put( NewAddedChannelsMap = maps:put(
ChannelID, ChannelID,
@ -832,7 +860,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
added_channels = NewAddedChannelsMap added_channels = NewAddedChannelsMap
}, },
%% Raise an alarm since the channel could not be added %% Raise an alarm since the channel could not be added
_ = maybe_alarm(?status_disconnected, ChannelID, Error, no_prev_error), _ = maybe_alarm(?status_disconnected, IsDryRun, ChannelID, Error, no_prev_error),
add_channels_in_list(Rest, NewData) add_channels_in_list(Rest, NewData)
end. end.
@ -856,7 +884,8 @@ stop_resource(#data{id = ResId} = Data) ->
false -> false ->
ok ok
end, end,
_ = maybe_clear_alarm(ResId), IsDryRun = emqx_resource:is_dry_run(ResId),
_ = maybe_clear_alarm(IsDryRun, ResId),
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId),
NewData#data{status = ?rm_status_stopped}. NewData#data{status = ?rm_status_stopped}.
@ -867,16 +896,24 @@ remove_channels(Data) ->
remove_channels_in_list([], Data, _KeepInChannelMap) -> remove_channels_in_list([], Data, _KeepInChannelMap) ->
Data; Data;
remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
AddedChannelsMap = Data#data.added_channels, #data{
id = ResId,
added_channels = AddedChannelsMap,
mod = Mod,
state = State,
group = Group,
type = Type
} = Data,
IsDryRun = emqx_resource:is_dry_run(ResId),
NewAddedChannelsMap = NewAddedChannelsMap =
case KeepInChannelMap of case KeepInChannelMap of
true -> true ->
AddedChannelsMap; AddedChannelsMap;
false -> false ->
_ = maybe_clear_alarm(ChannelID), _ = maybe_clear_alarm(IsDryRun, ChannelID),
maps:remove(ChannelID, AddedChannelsMap) maps:remove(ChannelID, AddedChannelsMap)
end, end,
case safe_call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID) of case safe_call_remove_channel(ResId, Mod, State, ChannelID) of
{ok, NewState} -> {ok, NewState} ->
NewData = Data#data{ NewData = Data#data{
state = NewState, state = NewState,
@ -884,12 +921,18 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
}, },
remove_channels_in_list(Rest, NewData, KeepInChannelMap); remove_channels_in_list(Rest, NewData, KeepInChannelMap);
{error, Reason} -> {error, Reason} ->
?SLOG(warning, #{ ?SLOG(
msg => remove_channel_failed, log_level(IsDryRun),
id => Data#data.id, #{
channel_id => ChannelID, msg => "remove_channel_failed",
reason => Reason resource_id => ResId,
}), group => Group,
type => Type,
channel_id => ChannelID,
reason => Reason
},
#{tag => tag(Group, Type)}
),
NewData = Data#data{ NewData = Data#data{
added_channels = NewAddedChannelsMap added_channels = NewAddedChannelsMap
}, },
@ -968,8 +1011,8 @@ handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) ->
handle_remove_channel(From, ChannelId, Data) -> handle_remove_channel(From, ChannelId, Data) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
%% Deactivate alarm IsDryRun = emqx_resource:is_dry_run(Data#data.id),
_ = maybe_clear_alarm(ChannelId), _ = maybe_clear_alarm(IsDryRun, ChannelId),
case case
channel_status_is_channel_added( channel_status_is_channel_added(
maps:get(ChannelId, Channels, channel_status_not_added(undefined)) maps:get(ChannelId, Channels, channel_status_not_added(undefined))
@ -990,13 +1033,18 @@ handle_remove_channel(From, ChannelId, Data) ->
end. end.
handle_remove_channel_exists(From, ChannelId, Data) -> handle_remove_channel_exists(From, ChannelId, Data) ->
#data{
id = Id,
group = Group,
type = Type,
added_channels = AddedChannelsMap
} = Data,
case case
emqx_resource:call_remove_channel( emqx_resource:call_remove_channel(
Data#data.id, Data#data.mod, Data#data.state, ChannelId Id, Data#data.mod, Data#data.state, ChannelId
) )
of of
{ok, NewState} -> {ok, NewState} ->
AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap = maps:remove(ChannelId, AddedChannelsMap), NewAddedChannelsMap = maps:remove(ChannelId, AddedChannelsMap),
UpdatedData = Data#data{ UpdatedData = Data#data{
state = NewState, state = NewState,
@ -1004,13 +1052,17 @@ handle_remove_channel_exists(From, ChannelId, Data) ->
}, },
{keep_state, update_state(UpdatedData, Data), [{reply, From, ok}]}; {keep_state, update_state(UpdatedData, Data), [{reply, From, ok}]};
{error, Reason} = Error -> {error, Reason} = Error ->
%% Log the error as a warning IsDryRun = emqx_resource:is_dry_run(Id),
?SLOG(warning, #{ ?SLOG(
msg => remove_channel_failed, log_level(IsDryRun),
id => Data#data.id, #{
channel_id => ChannelId, msg => "remove_channel_failed",
reason => Reason resource_id => Id,
}), channel_id => ChannelId,
reason => Reason
},
#{tag => tag(Group, Type)}
),
{keep_state_and_data, [{reply, From, Error}]} {keep_state_and_data, [{reply, From, Error}]}
end. end.
@ -1021,7 +1073,8 @@ handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
NewChannels = maps:remove(ChannelId, Channels), NewChannels = maps:remove(ChannelId, Channels),
NewData = Data#data{added_channels = NewChannels}, NewData = Data#data{added_channels = NewChannels},
_ = maybe_clear_alarm(ChannelId), IsDryRun = emqx_resource:is_dry_run(Data#data.id),
_ = maybe_clear_alarm(IsDryRun, ChannelId),
{keep_state, update_state(NewData, Data), [{reply, From, ok}]}. {keep_state, update_state(NewData, Data), [{reply, From, ok}]}.
handle_manual_resource_health_check(From, Data0 = #data{hc_workers = #{resource := HCWorkers}}) when handle_manual_resource_health_check(From, Data0 = #data{hc_workers = #{resource := HCWorkers}}) when
@ -1090,7 +1143,8 @@ continue_with_health_check(#data{} = Data0, CurrentState, HCRes) ->
error = PrevError error = PrevError
} = Data0, } = Data0,
{NewStatus, NewState, Err} = parse_health_check_result(HCRes, Data0), {NewStatus, NewState, Err} = parse_health_check_result(HCRes, Data0),
_ = maybe_alarm(NewStatus, ResId, Err, PrevError), IsDryRun = emqx_resource:is_dry_run(ResId),
_ = maybe_alarm(NewStatus, IsDryRun, ResId, Err, PrevError),
ok = maybe_resume_resource_workers(ResId, NewStatus), ok = maybe_resume_resource_workers(ResId, NewStatus),
Data1 = Data0#data{ Data1 = Data0#data{
state = NewState, status = NewStatus, error = Err state = NewState, status = NewStatus, error = Err
@ -1114,11 +1168,17 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
Actions = Replies ++ resource_health_check_actions(Data), Actions = Replies ++ resource_health_check_actions(Data),
{keep_state, Data, Actions}; {keep_state, Data, Actions};
_ -> _ ->
?SLOG(warning, #{ #data{id = ResId, group = Group, type = Type} = Data0,
msg => "health_check_failed", IsDryRun = emqx_resource:is_dry_run(ResId),
id => Data0#data.id, ?SLOG(
status => NewStatus log_level(IsDryRun),
}), #{
msg => "health_check_failed",
resource_id => ResId,
status => NewStatus
},
#{tag => tag(Group, Type)}
),
%% Note: works because, coincidentally, channel/resource status is a %% Note: works because, coincidentally, channel/resource status is a
%% subset of resource manager state... But there should be a conversion %% subset of resource manager state... But there should be a conversion
%% between the two here, as resource manager also has `stopped', which is %% between the two here, as resource manager also has `stopped', which is
@ -1214,7 +1274,7 @@ channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
channels_health_check(?status_connecting = _ConnectorStatus, Data0) -> channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
%% Whenever the resource is connecting: %% Whenever the resource is connecting:
%% 1. Change the status of all added channels to connecting %% 1. Change the status of all added channels to connecting
%% 2. Raise alarms (TODO: if it is a probe we should not raise alarms) %% 2. Raise alarms
Channels = Data0#data.added_channels, Channels = Data0#data.added_channels,
ChannelsToChangeStatusFor = [ ChannelsToChangeStatusFor = [
{ChannelId, Config} {ChannelId, Config}
@ -1240,9 +1300,10 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
|| {ChannelId, NewStatus} <- maps:to_list(NewChannels) || {ChannelId, NewStatus} <- maps:to_list(NewChannels)
], ],
%% Raise alarms for all channels %% Raise alarms for all channels
IsDryRun = emqx_resource:is_dry_run(Data0#data.id),
lists:foreach( lists:foreach(
fun({ChannelId, Status, PrevStatus}) -> fun({ChannelId, Status, PrevStatus}) ->
maybe_alarm(?status_connecting, ChannelId, Status, PrevStatus) maybe_alarm(?status_connecting, IsDryRun, ChannelId, Status, PrevStatus)
end, end,
ChannelsWithNewAndPrevErrorStatuses ChannelsWithNewAndPrevErrorStatuses
), ),
@ -1275,9 +1336,10 @@ channels_health_check(ConnectorStatus, Data0) ->
|| {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels) || {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels)
], ],
%% Raise alarms %% Raise alarms
IsDryRun = emqx_resource:is_dry_run(Data1#data.id),
_ = lists:foreach( _ = lists:foreach(
fun({ChannelId, OldStatus, NewStatus}) -> fun({ChannelId, OldStatus, NewStatus}) ->
_ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus) _ = maybe_alarm(NewStatus, IsDryRun, ChannelId, NewStatus, OldStatus)
end, end,
ChannelsWithNewAndOldStatuses ChannelsWithNewAndOldStatuses
), ),
@ -1386,13 +1448,14 @@ continue_channel_health_check_connected_no_update_during_check(ChannelId, OldSta
NewStatus = maps:get(ChannelId, Data1#data.added_channels), NewStatus = maps:get(ChannelId, Data1#data.added_channels),
ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)], ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)],
Data = remove_channels_in_list(ChannelsToRemove, Data1, true), Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
IsDryRun = emqx_resource:is_dry_run(Data1#data.id),
%% Raise/clear alarms %% Raise/clear alarms
case NewStatus of case NewStatus of
#{status := ?status_connected} -> #{status := ?status_connected} ->
_ = maybe_clear_alarm(ChannelId), _ = maybe_clear_alarm(IsDryRun, ChannelId),
ok; ok;
_ -> _ ->
_ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus), _ = maybe_alarm(NewStatus, IsDryRun, ChannelId, NewStatus, OldStatus),
ok ok
end, end,
Data. Data.
@ -1556,15 +1619,21 @@ remove_runtime_data(#data{} = Data0) ->
health_check_interval(Opts) -> health_check_interval(Opts) ->
maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).
-spec maybe_alarm(resource_status(), resource_id(), _Error :: term(), _PrevError :: term()) -> ok. -spec maybe_alarm(
maybe_alarm(?status_connected, _ResId, _Error, _PrevError) -> resource_status(),
boolean(),
resource_id(),
_Error :: term(),
_PrevError :: term()
) -> ok.
maybe_alarm(?status_connected, _IsDryRun, _ResId, _Error, _PrevError) ->
ok; ok;
maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>, _Error, _PrevError) -> maybe_alarm(_Status, true, _ResId, _Error, _PrevError) ->
ok; ok;
%% Assume that alarm is already active %% Assume that alarm is already active
maybe_alarm(_Status, _ResId, Error, Error) -> maybe_alarm(_Status, _IsDryRun, _ResId, Error, Error) ->
ok; ok;
maybe_alarm(_Status, ResId, Error, _PrevError) -> maybe_alarm(_Status, false, ResId, Error, _PrevError) ->
HrError = HrError =
case Error of case Error of
{error, undefined} -> {error, undefined} ->
@ -1596,10 +1665,10 @@ maybe_resume_resource_workers(ResId, ?status_connected) ->
maybe_resume_resource_workers(_, _) -> maybe_resume_resource_workers(_, _) ->
ok. ok.
-spec maybe_clear_alarm(resource_id()) -> ok | {error, not_found}. -spec maybe_clear_alarm(boolean(), resource_id()) -> ok | {error, not_found}.
maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) -> maybe_clear_alarm(true, _ResId) ->
ok; ok;
maybe_clear_alarm(ResId) -> maybe_clear_alarm(false, ResId) ->
emqx_alarm:safe_deactivate(ResId). emqx_alarm:safe_deactivate(ResId).
parse_health_check_result(Status, Data) when ?IS_STATUS(Status) -> parse_health_check_result(Status, Data) when ?IS_STATUS(Status) ->
@ -1615,7 +1684,8 @@ parse_health_check_result({error, Error}, Data) ->
msg => "health_check_exception", msg => "health_check_exception",
resource_id => Data#data.id, resource_id => Data#data.id,
reason => Error reason => Error
} },
#{tag => tag(Data#data.group, Data#data.type)}
), ),
{?status_disconnected, Data#data.state, {error, Error}}. {?status_disconnected, Data#data.state, {error, Error}}.
@ -1767,10 +1837,18 @@ add_or_update_channel_status(Data, ChannelId, ChannelConfig, State) ->
ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig), ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig),
NewChannels = maps:put(ChannelId, ChannelStatus, Channels), NewChannels = maps:put(ChannelId, ChannelStatus, Channels),
ResStatus = state_to_status(State), ResStatus = state_to_status(State),
maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev), IsDryRun = emqx_resource:is_dry_run(ChannelId),
maybe_alarm(ResStatus, IsDryRun, ChannelId, ChannelStatus, no_prev),
Data#data{added_channels = NewChannels}. Data#data{added_channels = NewChannels}.
state_to_status(?state_stopped) -> ?rm_status_stopped; state_to_status(?state_stopped) -> ?rm_status_stopped;
state_to_status(?state_connected) -> ?status_connected; state_to_status(?state_connected) -> ?status_connected;
state_to_status(?state_connecting) -> ?status_connecting; state_to_status(?state_connecting) -> ?status_connecting;
state_to_status(?state_disconnected) -> ?status_disconnected. state_to_status(?state_disconnected) -> ?status_disconnected.
log_level(true) -> info;
log_level(false) -> warning.
tag(Group, Type) ->
Str = emqx_utils_conv:str(Group) ++ "/" ++ emqx_utils_conv:str(Type),
string:uppercase(Str).

View File

@ -58,10 +58,11 @@ init([]) ->
child_spec(ResId, Group, ResourceType, Config, Opts) -> child_spec(ResId, Group, ResourceType, Config, Opts) ->
#{ #{
id => ResId, id => ResId,
start => {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]}, start =>
{emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]},
restart => transient, restart => transient,
%% never force kill a resource manager. %% never force kill a resource manager.
%% becasue otherwise it may lead to release leak, %% because otherwise it may lead to release leak,
%% resource_manager's terminate callback calls resource on_stop %% resource_manager's terminate callback calls resource on_stop
shutdown => infinity, shutdown => infinity,
type => worker, type => worker,

View File

@ -17,6 +17,7 @@
-module(emqx_resource_metrics). -module(emqx_resource_metrics).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include("emqx_resource.hrl").
-export([ -export([
events/0, events/0,
@ -74,7 +75,6 @@
success_get/1 success_get/1
]). ]).
-define(RES_METRICS, resource_metrics).
-define(TELEMETRY_PREFIX, emqx, resource). -define(TELEMETRY_PREFIX, emqx, resource).
-spec events() -> [telemetry:event_name()]. -spec events() -> [telemetry:event_name()].
@ -127,15 +127,19 @@ handle_telemetry_event(
%% We catch errors to avoid detaching the telemetry handler function. %% We catch errors to avoid detaching the telemetry handler function.
%% When restarting a resource while it's under load, there might be transient %% When restarting a resource while it's under load, there might be transient
%% failures while the metrics are not yet created. %% failures while the metrics are not yet created.
?SLOG(warning, #{ ?SLOG(
msg => "handle_resource_metrics_failed", warning,
hint => "transient failures may occur when restarting a resource", #{
kind => Kind, msg => "handle_resource_metrics_failed",
reason => Reason, hint => "transient failures may occur when restarting a resource",
stacktrace => Stacktrace, kind => Kind,
resource_id => ID, reason => Reason,
event => Event stacktrace => Stacktrace,
}), resource_id => ID,
event => Event
},
#{tag => ?TAG}
),
ok ok
end; end;
handle_telemetry_event( handle_telemetry_event(
@ -151,15 +155,19 @@ handle_telemetry_event(
%% We catch errors to avoid detaching the telemetry handler function. %% We catch errors to avoid detaching the telemetry handler function.
%% When restarting a resource while it's under load, there might be transient %% When restarting a resource while it's under load, there might be transient
%% failures while the metrics are not yet created. %% failures while the metrics are not yet created.
?SLOG(warning, #{ ?SLOG(
msg => "handle_resource_metrics_failed", warning,
hint => "transient failures may occur when restarting a resource", #{
kind => Kind, msg => "handle_resource_metrics_failed",
reason => Reason, hint => "transient failures may occur when restarting a resource",
stacktrace => Stacktrace, kind => Kind,
resource_id => ID, reason => Reason,
event => Event stacktrace => Stacktrace,
}), resource_id => ID,
event => Event
},
#{tag => ?TAG}
),
ok ok
end; end;
handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->

View File

@ -26,6 +26,7 @@
]). ]).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include("emqx_resource.hrl").
-ifndef(TEST). -ifndef(TEST).
-define(HEALTH_CHECK_TIMEOUT, 15000). -define(HEALTH_CHECK_TIMEOUT, 15000).
@ -37,33 +38,43 @@
start(Name, Mod, Options) -> start(Name, Mod, Options) ->
case ecpool:start_sup_pool(Name, Mod, Options) of case ecpool:start_sup_pool(Name, Mod, Options) of
{ok, _} -> {ok, _} ->
?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}), ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}, #{tag => ?TAG}),
ok; ok;
{error, {already_started, _Pid}} -> {error, {already_started, _Pid}} ->
stop(Name), stop(Name),
start(Name, Mod, Options); start(Name, Mod, Options);
{error, Reason} -> {error, Reason} ->
NReason = parse_reason(Reason), NReason = parse_reason(Reason),
?SLOG(error, #{ IsDryRun = emqx_resource:is_dry_run(Name),
msg => "start_ecpool_error", ?SLOG(
pool_name => Name, ?LOG_LEVEL(IsDryRun),
reason => NReason #{
}), msg => "start_ecpool_error",
resource_id => Name,
reason => NReason
},
#{tag => ?TAG}
),
{error, {start_pool_failed, Name, NReason}} {error, {start_pool_failed, Name, NReason}}
end. end.
stop(Name) -> stop(Name) ->
case ecpool:stop_sup_pool(Name) of case ecpool:stop_sup_pool(Name) of
ok -> ok ->
?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name}); ?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name}, #{tag => ?TAG});
{error, not_found} -> {error, not_found} ->
ok; ok;
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ IsDryRun = emqx_resource:is_dry_run(Name),
msg => "stop_ecpool_failed", ?SLOG(
pool_name => Name, ?LOG_LEVEL(IsDryRun),
reason => Reason #{
}), msg => "stop_ecpool_failed",
resource_id => Name,
reason => Reason
},
#{tag => ?TAG}
),
error({stop_pool_failed, Name, Reason}) error({stop_pool_failed, Name, Reason})
end. end.

View File

@ -40,7 +40,7 @@ deprecated_since() ->
-spec create( -spec create(
resource_id(), resource_id(),
resource_group(), resource_group(),
resource_type(), resource_module(),
resource_config(), resource_config(),
creation_opts() creation_opts()
) -> ) ->
@ -51,7 +51,7 @@ create(ResId, Group, ResourceType, Config, Opts) ->
]). ]).
-spec create_dry_run( -spec create_dry_run(
resource_type(), resource_module(),
resource_config() resource_config()
) -> ) ->
ok | {error, Reason :: term()}. ok | {error, Reason :: term()}.
@ -60,7 +60,7 @@ create_dry_run(ResourceType, Config) ->
-spec recreate( -spec recreate(
resource_id(), resource_id(),
resource_type(), resource_module(),
resource_config(), resource_config(),
creation_opts() creation_opts()
) -> ) ->

View File

@ -24,6 +24,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -62,6 +63,8 @@ register(required) -> true;
register(default) -> false; register(default) -> false;
register(_) -> undefined. register(_) -> undefined.
resource_type() -> demo.
callback_mode() -> callback_mode() ->
persistent_term:get(?CM_KEY). persistent_term:get(?CM_KEY).

View File

@ -3490,7 +3490,7 @@ gauge_metric_set_fns() ->
]. ].
create(Id, Group, Type, Config) -> create(Id, Group, Type, Config) ->
emqx_resource:create_local(Id, Group, Type, Config). emqx_resource:create_local(Id, Group, Type, Config, #{}).
create(Id, Group, Type, Config, Opts) -> create(Id, Group, Type, Config, Opts) ->
emqx_resource:create_local(Id, Group, Type, Config, Opts). emqx_resource:create_local(Id, Group, Type, Config, Opts).

View File

@ -25,6 +25,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -40,6 +41,8 @@
]). ]).
%% =================================================================== %% ===================================================================
resource_type() -> test_connector.
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start( on_start(