feat: add resource_type to emqx_resource behaviour

This commit is contained in:
zhongwencool 2024-07-23 16:15:57 +08:00
parent e74a921d33
commit e7d07ea17c
109 changed files with 248 additions and 211 deletions

View File

@ -21,8 +21,8 @@
-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("snabbkaffe/include/trace.hrl").
-export([ -export([
create_resource/4, create_resource/3,
update_resource/4, update_resource/3,
check_password_from_selected_map/3, check_password_from_selected_map/3,
parse_deep/1, parse_deep/1,
parse_str/1, parse_str/1,
@ -66,9 +66,8 @@
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
create_resource(Type, ResourceId, Module, Config) -> create_resource(ResourceId, Module, Config) ->
Result = emqx_resource:create_local( Result = emqx_resource:create_local(
Type,
ResourceId, ResourceId,
?AUTHN_RESOURCE_GROUP, ?AUTHN_RESOURCE_GROUP,
Module, Module,
@ -77,9 +76,9 @@ create_resource(Type, ResourceId, Module, Config) ->
), ),
start_resource_if_enabled(Result, ResourceId, Config). start_resource_if_enabled(Result, ResourceId, Config).
update_resource(Type, Module, Config, ResourceId) -> update_resource(Module, Config, ResourceId) ->
Result = emqx_resource:recreate_local( Result = emqx_resource:recreate_local(
Type, ResourceId, Module, Config, ?DEFAULT_RESOURCE_OPTS ResourceId, Module, Config, ?DEFAULT_RESOURCE_OPTS
), ),
start_resource_if_enabled(Result, ResourceId, Config). start_resource_if_enabled(Result, ResourceId, Config).

View File

@ -25,9 +25,9 @@
-export([ -export([
cleanup_resources/0, cleanup_resources/0,
make_resource_id/1, make_resource_id/1,
create_resource/2,
create_resource/3, create_resource/3,
create_resource/4, update_resource/2,
update_resource/3,
remove_resource/1, remove_resource/1,
update_config/2, update_config/2,
parse_deep/2, parse_deep/2,
@ -57,13 +57,12 @@
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
create_resource(Type, Module, Config) -> create_resource(Module, Config) ->
ResourceId = make_resource_id(Module), ResourceId = make_resource_id(Module),
create_resource(Type, ResourceId, Module, Config). create_resource(ResourceId, Module, Config).
create_resource(Type, ResourceId, Module, Config) -> create_resource(ResourceId, Module, Config) ->
Result = emqx_resource:create_local( Result = emqx_resource:create_local(
Type,
ResourceId, ResourceId,
?AUTHZ_RESOURCE_GROUP, ?AUTHZ_RESOURCE_GROUP,
Module, Module,
@ -72,11 +71,10 @@ create_resource(Type, ResourceId, Module, Config) ->
), ),
start_resource_if_enabled(Result, ResourceId, Config). start_resource_if_enabled(Result, ResourceId, Config).
update_resource(Type, Module, #{annotations := #{id := ResourceId}} = Config) -> update_resource(Module, #{annotations := #{id := ResourceId}} = Config) ->
Result = Result =
case case
emqx_resource:recreate_local( emqx_resource:recreate_local(
Type,
ResourceId, ResourceId,
Module, Module,
Config, Config,

View File

@ -40,7 +40,6 @@ create(Config0) ->
ResourceId = emqx_authn_utils:make_resource_id(?MODULE), ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
% {Config, State} = parse_config(Config0), % {Config, State} = parse_config(Config0),
{ok, _Data} = emqx_authn_utils:create_resource( {ok, _Data} = emqx_authn_utils:create_resource(
http,
ResourceId, ResourceId,
emqx_bridge_http_connector, emqx_bridge_http_connector,
Config Config
@ -51,9 +50,7 @@ create(Config0) ->
update(Config0, #{resource_id := ResourceId} = _State) -> update(Config0, #{resource_id := ResourceId} = _State) ->
with_validated_config(Config0, fun(Config, NState) -> with_validated_config(Config0, fun(Config, NState) ->
% {Config, NState} = parse_config(Config0), % {Config, NState} = parse_config(Config0),
case case emqx_authn_utils:update_resource(emqx_bridge_http_connector, Config, ResourceId) of
emqx_authn_utils:update_resource(http, emqx_bridge_http_connector, Config, ResourceId)
of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, _} -> {ok, _} ->

View File

@ -67,13 +67,15 @@ create(Config) ->
NConfig = parse_config(Config), NConfig = parse_config(Config),
ResourceId = emqx_authn_utils:make_resource_id(?MODULE), ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
{ok, _Data} = emqx_authz_utils:create_resource( {ok, _Data} = emqx_authz_utils:create_resource(
http, ResourceId, emqx_bridge_http_connector, NConfig ResourceId,
emqx_bridge_http_connector,
NConfig
), ),
NConfig#{annotations => #{id => ResourceId}}. NConfig#{annotations => #{id => ResourceId}}.
update(Config) -> update(Config) ->
NConfig = parse_config(Config), NConfig = parse_config(Config),
case emqx_authz_utils:update_resource(http, emqx_bridge_http_connector, NConfig) of case emqx_authz_utils:update_resource(emqx_bridge_http_connector, NConfig) of
{error, Reason} -> error({load_config_error, Reason}); {error, Reason} -> error({load_config_error, Reason});
{ok, Id} -> NConfig#{annotations => #{id => Id}} {ok, Id} -> NConfig#{annotations => #{id => Id}}
end. end.

View File

@ -22,6 +22,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -32,6 +33,8 @@
-define(DEFAULT_POOL_SIZE, 8). -define(DEFAULT_POOL_SIZE, 8).
resource_type() -> jwks.
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start(InstId, Opts) -> on_start(InstId, Opts) ->

View File

@ -183,7 +183,6 @@ do_create(
) -> ) ->
ResourceId = emqx_authn_utils:make_resource_id(?MODULE), ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
{ok, _Data} = emqx_resource:create_local( {ok, _Data} = emqx_resource:create_local(
jwt,
ResourceId, ResourceId,
?AUTHN_RESOURCE_GROUP, ?AUTHN_RESOURCE_GROUP,
emqx_authn_jwks_connector, emqx_authn_jwks_connector,

View File

@ -40,12 +40,12 @@ create(_AuthenticatorID, Config) ->
do_create(Module, Config) -> do_create(Module, Config) ->
ResourceId = emqx_authn_utils:make_resource_id(Module), ResourceId = emqx_authn_utils:make_resource_id(Module),
State = parse_config(Config), State = parse_config(Config),
{ok, _Data} = emqx_authn_utils:create_resource(ldap, ResourceId, emqx_ldap, Config), {ok, _Data} = emqx_authn_utils:create_resource(ResourceId, emqx_ldap, Config),
{ok, State#{resource_id => ResourceId}}. {ok, State#{resource_id => ResourceId}}.
update(Config, #{resource_id := ResourceId} = _State) -> update(Config, #{resource_id := ResourceId} = _State) ->
NState = parse_config(Config), NState = parse_config(Config),
case emqx_authn_utils:update_resource(ldap, emqx_ldap, Config, ResourceId) of case emqx_authn_utils:update_resource(emqx_ldap, Config, ResourceId) of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, _} -> {ok, _} ->

View File

@ -56,12 +56,12 @@ description() ->
create(Source) -> create(Source) ->
ResourceId = emqx_authz_utils:make_resource_id(?MODULE), ResourceId = emqx_authz_utils:make_resource_id(?MODULE),
{ok, _Data} = emqx_authz_utils:create_resource(ldap, ResourceId, emqx_ldap, Source), {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_ldap, Source),
Annotations = new_annotations(#{id => ResourceId}, Source), Annotations = new_annotations(#{id => ResourceId}, Source),
Source#{annotations => Annotations}. Source#{annotations => Annotations}.
update(Source) -> update(Source) ->
case emqx_authz_utils:update_resource(ldap, emqx_ldap, Source) of case emqx_authz_utils:update_resource(emqx_ldap, Source) of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, Id} -> {ok, Id} ->

View File

@ -47,7 +47,6 @@ init_per_suite(Config) ->
work_dir => ?config(priv_dir, Config) work_dir => ?config(priv_dir, Config)
}), }),
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
ldap,
?LDAP_RESOURCE, ?LDAP_RESOURCE,
?AUTHN_RESOURCE_GROUP, ?AUTHN_RESOURCE_GROUP,
emqx_ldap, emqx_ldap,

View File

@ -47,7 +47,6 @@ init_per_suite(Config) ->
work_dir => ?config(priv_dir, Config) work_dir => ?config(priv_dir, Config)
}), }),
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
ldap,
?LDAP_RESOURCE, ?LDAP_RESOURCE,
?AUTHN_RESOURCE_GROUP, ?AUTHN_RESOURCE_GROUP,
emqx_ldap, emqx_ldap,

View File

@ -178,7 +178,6 @@ stop_apps(Apps) ->
create_ldap_resource() -> create_ldap_resource() ->
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
ldap,
?LDAP_RESOURCE, ?LDAP_RESOURCE,
?AUTHZ_RESOURCE_GROUP, ?AUTHZ_RESOURCE_GROUP,
emqx_ldap, emqx_ldap,

View File

@ -37,7 +37,6 @@ create(Config0) ->
ResourceId = emqx_authn_utils:make_resource_id(?MODULE), ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
{Config, State} = parse_config(Config0), {Config, State} = parse_config(Config0),
{ok, _Data} = emqx_authn_utils:create_resource( {ok, _Data} = emqx_authn_utils:create_resource(
mongodb,
ResourceId, ResourceId,
emqx_mongodb, emqx_mongodb,
Config Config
@ -46,7 +45,7 @@ create(Config0) ->
update(Config0, #{resource_id := ResourceId} = _State) -> update(Config0, #{resource_id := ResourceId} = _State) ->
{Config, NState} = parse_config(Config0), {Config, NState} = parse_config(Config0),
case emqx_authn_utils:update_resource(mongodb, emqx_mongodb, Config, ResourceId) of case emqx_authn_utils:update_resource(emqx_mongodb, Config, ResourceId) of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, _} -> {ok, _} ->

View File

@ -49,13 +49,13 @@ description() ->
create(#{filter := Filter} = Source) -> create(#{filter := Filter} = Source) ->
ResourceId = emqx_authz_utils:make_resource_id(?MODULE), ResourceId = emqx_authz_utils:make_resource_id(?MODULE),
{ok, _Data} = emqx_authz_utils:create_resource(mongodb, ResourceId, emqx_mongodb, Source), {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_mongodb, Source),
FilterTemp = emqx_authz_utils:parse_deep(Filter, ?ALLOWED_VARS), FilterTemp = emqx_authz_utils:parse_deep(Filter, ?ALLOWED_VARS),
Source#{annotations => #{id => ResourceId}, filter_template => FilterTemp}. Source#{annotations => #{id => ResourceId}, filter_template => FilterTemp}.
update(#{filter := Filter} = Source) -> update(#{filter := Filter} = Source) ->
FilterTemp = emqx_authz_utils:parse_deep(Filter, ?ALLOWED_VARS), FilterTemp = emqx_authz_utils:parse_deep(Filter, ?ALLOWED_VARS),
case emqx_authz_utils:update_resource(mongodb, emqx_mongodb, Source) of case emqx_authz_utils:update_resource(emqx_mongodb, Source) of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, Id} -> {ok, Id} ->

View File

@ -39,12 +39,12 @@ create(_AuthenticatorID, Config) ->
create(Config0) -> create(Config0) ->
ResourceId = emqx_authn_utils:make_resource_id(?MODULE), ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
{Config, State} = parse_config(Config0), {Config, State} = parse_config(Config0),
{ok, _Data} = emqx_authn_utils:create_resource(mysql, ResourceId, emqx_mysql, Config), {ok, _Data} = emqx_authn_utils:create_resource(ResourceId, emqx_mysql, Config),
{ok, State#{resource_id => ResourceId}}. {ok, State#{resource_id => ResourceId}}.
update(Config0, #{resource_id := ResourceId} = _State) -> update(Config0, #{resource_id := ResourceId} = _State) ->
{Config, NState} = parse_config(Config0), {Config, NState} = parse_config(Config0),
case emqx_authn_utils:update_resource(mysql, emqx_mysql, Config, ResourceId) of case emqx_authn_utils:update_resource(emqx_mysql, Config, ResourceId) of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, _} -> {ok, _} ->

View File

@ -53,13 +53,13 @@ create(#{query := SQL} = Source0) ->
{PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?ALLOWED_VARS), {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?ALLOWED_VARS),
ResourceId = emqx_authz_utils:make_resource_id(?MODULE), ResourceId = emqx_authz_utils:make_resource_id(?MODULE),
Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}}, Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}},
{ok, _Data} = emqx_authz_utils:create_resource(mysql, ResourceId, emqx_mysql, Source), {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_mysql, Source),
Source#{annotations => #{id => ResourceId, tmpl_token => TmplToken}}. Source#{annotations => #{id => ResourceId, tmpl_token => TmplToken}}.
update(#{query := SQL} = Source0) -> update(#{query := SQL} = Source0) ->
{PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?ALLOWED_VARS), {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?ALLOWED_VARS),
Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}}, Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}},
case emqx_authz_utils:update_resource(mysql, emqx_mysql, Source) of case emqx_authz_utils:update_resource(emqx_mysql, Source) of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, Id} -> {ok, Id} ->

View File

@ -58,7 +58,6 @@ init_per_suite(Config) ->
work_dir => ?config(priv_dir, Config) work_dir => ?config(priv_dir, Config)
}), }),
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
mysql,
?MYSQL_RESOURCE, ?MYSQL_RESOURCE,
?AUTHN_RESOURCE_GROUP, ?AUTHN_RESOURCE_GROUP,
emqx_mysql, emqx_mysql,

View File

@ -446,7 +446,6 @@ stop_apps(Apps) ->
create_mysql_resource() -> create_mysql_resource() ->
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
mysql,
?MYSQL_RESOURCE, ?MYSQL_RESOURCE,
?AUTHZ_RESOURCE_GROUP, ?AUTHZ_RESOURCE_GROUP,
emqx_mysql, emqx_mysql,

View File

@ -45,7 +45,6 @@ create(Config0) ->
ResourceId = emqx_authn_utils:make_resource_id(?MODULE), ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
{Config, State} = parse_config(Config0, ResourceId), {Config, State} = parse_config(Config0, ResourceId),
{ok, _Data} = emqx_authn_utils:create_resource( {ok, _Data} = emqx_authn_utils:create_resource(
postgresql,
ResourceId, ResourceId,
emqx_postgresql, emqx_postgresql,
Config Config
@ -54,7 +53,7 @@ create(Config0) ->
update(Config0, #{resource_id := ResourceId} = _State) -> update(Config0, #{resource_id := ResourceId} = _State) ->
{Config, NState} = parse_config(Config0, ResourceId), {Config, NState} = parse_config(Config0, ResourceId),
case emqx_authn_utils:update_resource(postgresql, emqx_postgresql, Config, ResourceId) of case emqx_authn_utils:update_resource(emqx_postgresql, Config, ResourceId) of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, _} -> {ok, _} ->

View File

@ -53,7 +53,6 @@ create(#{query := SQL0} = Source) ->
{SQL, PlaceHolders} = emqx_authz_utils:parse_sql(SQL0, '$n', ?ALLOWED_VARS), {SQL, PlaceHolders} = emqx_authz_utils:parse_sql(SQL0, '$n', ?ALLOWED_VARS),
ResourceID = emqx_authz_utils:make_resource_id(emqx_postgresql), ResourceID = emqx_authz_utils:make_resource_id(emqx_postgresql),
{ok, _Data} = emqx_authz_utils:create_resource( {ok, _Data} = emqx_authz_utils:create_resource(
postgresql,
ResourceID, ResourceID,
emqx_postgresql, emqx_postgresql,
Source#{prepare_statement => #{ResourceID => SQL}} Source#{prepare_statement => #{ResourceID => SQL}}
@ -64,7 +63,6 @@ update(#{query := SQL0, annotations := #{id := ResourceID}} = Source) ->
{SQL, PlaceHolders} = emqx_authz_utils:parse_sql(SQL0, '$n', ?ALLOWED_VARS), {SQL, PlaceHolders} = emqx_authz_utils:parse_sql(SQL0, '$n', ?ALLOWED_VARS),
case case
emqx_authz_utils:update_resource( emqx_authz_utils:update_resource(
postgresql,
emqx_postgresql, emqx_postgresql,
Source#{prepare_statement => #{ResourceID => SQL}} Source#{prepare_statement => #{ResourceID => SQL}}
) )

View File

@ -79,7 +79,6 @@ init_per_suite(Config) ->
work_dir => ?config(priv_dir, Config) work_dir => ?config(priv_dir, Config)
}), }),
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
postgresql,
?PGSQL_RESOURCE, ?PGSQL_RESOURCE,
?AUTHN_RESOURCE_GROUP, ?AUTHN_RESOURCE_GROUP,
emqx_postgresql, emqx_postgresql,
@ -199,13 +198,9 @@ test_user_auth(#{
t_authenticate_disabled_prepared_statements(_Config) -> t_authenticate_disabled_prepared_statements(_Config) ->
ResConfig = maps:merge(pgsql_config(), #{disable_prepared_statements => true}), ResConfig = maps:merge(pgsql_config(), #{disable_prepared_statements => true}),
{ok, _} = emqx_resource:recreate_local( {ok, _} = emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, ResConfig, #{}),
postgresql, ?PGSQL_RESOURCE, emqx_postgresql, ResConfig, #{}
),
on_exit(fun() -> on_exit(fun() ->
emqx_resource:recreate_local( emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, pgsql_config(), #{})
postgresql, ?PGSQL_RESOURCE, emqx_postgresql, pgsql_config(), #{}
)
end), end),
ok = lists:foreach( ok = lists:foreach(
fun(Sample0) -> fun(Sample0) ->

View File

@ -437,7 +437,6 @@ pgsql_config() ->
create_pgsql_resource() -> create_pgsql_resource() ->
emqx_resource:create_local( emqx_resource:create_local(
postgresql,
?PGSQL_RESOURCE, ?PGSQL_RESOURCE,
?AUTHZ_RESOURCE_GROUP, ?AUTHZ_RESOURCE_GROUP,
emqx_postgresql, emqx_postgresql,

View File

@ -42,7 +42,6 @@ create(Config0) ->
Res; Res;
{Config, State} -> {Config, State} ->
{ok, _Data} = emqx_authn_utils:create_resource( {ok, _Data} = emqx_authn_utils:create_resource(
redis,
ResourceId, ResourceId,
emqx_redis, emqx_redis,
Config Config
@ -52,7 +51,7 @@ create(Config0) ->
update(Config0, #{resource_id := ResourceId} = _State) -> update(Config0, #{resource_id := ResourceId} = _State) ->
{Config, NState} = parse_config(Config0), {Config, NState} = parse_config(Config0),
case emqx_authn_utils:update_resource(redis, emqx_redis, Config, ResourceId) of case emqx_authn_utils:update_resource(emqx_redis, Config, ResourceId) of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, _} -> {ok, _} ->

View File

@ -50,12 +50,12 @@ description() ->
create(#{cmd := CmdStr} = Source) -> create(#{cmd := CmdStr} = Source) ->
CmdTemplate = parse_cmd(CmdStr), CmdTemplate = parse_cmd(CmdStr),
ResourceId = emqx_authz_utils:make_resource_id(?MODULE), ResourceId = emqx_authz_utils:make_resource_id(?MODULE),
{ok, _Data} = emqx_authz_utils:create_resource(redis, ResourceId, emqx_redis, Source), {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_redis, Source),
Source#{annotations => #{id => ResourceId}, cmd_template => CmdTemplate}. Source#{annotations => #{id => ResourceId}, cmd_template => CmdTemplate}.
update(#{cmd := CmdStr} = Source) -> update(#{cmd := CmdStr} = Source) ->
CmdTemplate = parse_cmd(CmdStr), CmdTemplate = parse_cmd(CmdStr),
case emqx_authz_utils:update_resource(redis, emqx_redis, Source) of case emqx_authz_utils:update_resource(emqx_redis, Source) of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, Id} -> {ok, Id} ->

View File

@ -63,7 +63,6 @@ init_per_suite(Config) ->
work_dir => ?config(priv_dir, Config) work_dir => ?config(priv_dir, Config)
}), }),
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
redis,
?REDIS_RESOURCE, ?REDIS_RESOURCE,
?AUTHN_RESOURCE_GROUP, ?AUTHN_RESOURCE_GROUP,
emqx_redis, emqx_redis,

View File

@ -384,7 +384,6 @@ stop_apps(Apps) ->
create_redis_resource() -> create_redis_resource() ->
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
redis,
?REDIS_RESOURCE, ?REDIS_RESOURCE,
?AUTHZ_RESOURCE_GROUP, ?AUTHZ_RESOURCE_GROUP,
emqx_redis, emqx_redis,

View File

@ -195,7 +195,6 @@ create(Type, Name, Conf0, Opts) ->
TypeBin = bin(Type), TypeBin = bin(Type),
Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name}, Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name},
{ok, _Data} = emqx_resource:create_local( {ok, _Data} = emqx_resource:create_local(
Type,
resource_id(Type, Name), resource_id(Type, Name),
<<"bridge">>, <<"bridge">>,
bridge_to_resource_type(Type), bridge_to_resource_type(Type),
@ -265,7 +264,6 @@ recreate(Type, Name, Conf0, Opts) ->
TypeBin = bin(Type), TypeBin = bin(Type),
Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name}, Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name},
emqx_resource:recreate_local( emqx_resource:recreate_local(
Type,
resource_id(Type, Name), resource_id(Type, Name),
bridge_to_resource_type(Type), bridge_to_resource_type(Type),
parse_confs(TypeBin, Name, Conf), parse_confs(TypeBin, Name, Conf),
@ -302,7 +300,7 @@ create_dry_run_bridge_v1(Type, Conf0) ->
{error, Reason}; {error, Reason};
{ok, ConfNew} -> {ok, ConfNew} ->
ParseConf = parse_confs(TypeBin, TmpName, ConfNew), ParseConf = parse_confs(TypeBin, TmpName, ConfNew),
emqx_resource:create_dry_run_local(Type, bridge_to_resource_type(Type), ParseConf) emqx_resource:create_dry_run_local(bridge_to_resource_type(Type), ParseConf)
end end
catch catch
%% validation errors %% validation errors

View File

@ -1110,6 +1110,7 @@ t_query_uses_action_query_mode(_Config) ->
%% ... now we use a quite different query mode for the action %% ... now we use a quite different query mode for the action
meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer), meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer),
meck:expect(con_mod(), resource_type, 0, dummy),
meck:expect(con_mod(), callback_mode, 0, async_if_possible), meck:expect(con_mod(), callback_mode, 0, async_if_possible),
{ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),

View File

@ -293,6 +293,7 @@ init_mocks() ->
meck:new(emqx_connector_resource, [passthrough, no_link]), meck:new(emqx_connector_resource, [passthrough, no_link]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL),
meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]),
meck:expect(?CONNECTOR_IMPL, resource_type, 0, dummy),
meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible),
meck:expect( meck:expect(
?CONNECTOR_IMPL, ?CONNECTOR_IMPL,

View File

@ -15,15 +15,17 @@
%% this module is only intended to be mocked %% this module is only intended to be mocked
-module(emqx_bridge_v2_dummy_connector). -module(emqx_bridge_v2_dummy_connector).
-behavior(emqx_resource).
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_add_channel/4, on_add_channel/4,
on_get_channel_status/3 on_get_channel_status/3
]). ]).
resource_type() -> dummy.
callback_mode() -> error(unexpected). callback_mode() -> error(unexpected).
on_start(_, _) -> error(unexpected). on_start(_, _) -> error(unexpected).
on_stop(_, _) -> error(unexpected). on_stop(_, _) -> error(unexpected).

View File

@ -19,6 +19,7 @@
-export([ -export([
query_mode/1, query_mode/1,
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -34,6 +35,8 @@
query_mode(_Config) -> query_mode(_Config) ->
sync. sync.
resource_type() -> test_connector.
callback_mode() -> callback_mode() ->
always_sync. always_sync.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_cassandra, [ {application, emqx_bridge_cassandra, [
{description, "EMQX Enterprise Cassandra Bridge"}, {description, "EMQX Enterprise Cassandra Bridge"},
{vsn, "0.3.1"}, {vsn, "0.3.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -19,6 +19,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -94,6 +95,7 @@ desc("connector") ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% callbacks for emqx_resource %% callbacks for emqx_resource
resource_type() -> cassandra.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.

View File

@ -212,7 +212,6 @@ check_config(Config) ->
create_local_resource(ResourceId, CheckedConfig) -> create_local_resource(ResourceId, CheckedConfig) ->
{ok, Bridge} = {ok, Bridge} =
emqx_resource:create_local( emqx_resource:create_local(
cassandra,
ResourceId, ResourceId,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?CASSANDRA_RESOURCE_MOD, ?CASSANDRA_RESOURCE_MOD,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_clickhouse, [ {application, emqx_bridge_clickhouse, [
{description, "EMQX Enterprise ClickHouse Bridge"}, {description, "EMQX Enterprise ClickHouse Bridge"},
{vsn, "0.4.1"}, {vsn, "0.4.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -29,6 +29,7 @@
%% callbacks for behaviour emqx_resource %% callbacks for behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -128,6 +129,7 @@ values(_) ->
%% =================================================================== %% ===================================================================
%% Callbacks defined in emqx_resource %% Callbacks defined in emqx_resource
%% =================================================================== %% ===================================================================
resource_type() -> clickhouse.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -109,7 +109,6 @@ t_start_passfile(Config) ->
?assertMatch( ?assertMatch(
{ok, #{status := connected}}, {ok, #{status := connected}},
emqx_resource:create_local( emqx_resource:create_local(
clickhouse,
ResourceID, ResourceID,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?CLICKHOUSE_RESOURCE_MOD, ?CLICKHOUSE_RESOURCE_MOD,
@ -139,7 +138,6 @@ perform_lifecycle_check(ResourceID, InitialConfig) ->
status := InitialStatus status := InitialStatus
}} = }} =
emqx_resource:create_local( emqx_resource:create_local(
clickhouse,
ResourceID, ResourceID,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?CLICKHOUSE_RESOURCE_MOD, ?CLICKHOUSE_RESOURCE_MOD,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_dynamo, [ {application, emqx_bridge_dynamo, [
{description, "EMQX Enterprise Dynamo Bridge"}, {description, "EMQX Enterprise Dynamo Bridge"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -17,6 +17,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -68,6 +69,7 @@ fields(config) ->
%%======================================================================================== %%========================================================================================
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
resource_type() -> dynamo.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_es, [ {application, emqx_bridge_es, [
{description, "EMQX Enterprise Elastic Search Bridge"}, {description, "EMQX Enterprise Elastic Search Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{modules, [ {modules, [
emqx_bridge_es, emqx_bridge_es,
emqx_bridge_es_connector emqx_bridge_es_connector

View File

@ -14,6 +14,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -207,6 +208,8 @@ base_url(#{server := Server}) -> "http://" ++ Server.
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
resource_type() -> elastic_search.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
-spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). -spec on_start(manager_id(), config()) -> {ok, state()} | no_return().

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [ {application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"}, {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.3.1"}, {vsn, "0.3.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -8,6 +8,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
query_mode/1, query_mode/1,
on_start/2, on_start/2,
@ -84,6 +85,8 @@
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
-spec resource_type() -> resource_type().
resource_type() -> gcp_pubsub_consumer.
-spec callback_mode() -> callback_mode(). -spec callback_mode() -> callback_mode().
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.

View File

@ -41,6 +41,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
query_mode/1, query_mode/1,
on_start/2, on_start/2,
@ -62,6 +63,7 @@
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
resource_type() -> gcp_pubsub.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.

View File

@ -16,6 +16,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -67,6 +68,8 @@
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% resource callback %% resource callback
resource_type() -> greptimedb.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
on_add_channel( on_add_channel(

View File

@ -83,7 +83,6 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
state := #{client := #{pool := ReturnedPoolName}} = State, state := #{client := #{pool := ReturnedPoolName}} = State,
status := InitialStatus status := InitialStatus
}} = emqx_resource:create_local( }} = emqx_resource:create_local(
greptimedb,
PoolName, PoolName,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?GREPTIMEDB_RESOURCE_MOD, ?GREPTIMEDB_RESOURCE_MOD,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_hstreamdb, [ {application, emqx_bridge_hstreamdb, [
{description, "EMQX Enterprise HStreamDB Bridge"}, {description, "EMQX Enterprise HStreamDB Bridge"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -16,6 +16,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -44,6 +45,8 @@
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% resource callback %% resource callback
resource_type() -> hstreamsdb.
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start(InstId, Config) -> on_start(InstId, Config) ->

View File

@ -26,6 +26,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -183,6 +184,7 @@ sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field). ref(Field) -> hoconsc:ref(?MODULE, Field).
%% =================================================================== %% ===================================================================
resource_type() -> webhook.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_influxdb, [ {application, emqx_bridge_influxdb, [
{description, "EMQX Enterprise InfluxDB Bridge"}, {description, "EMQX Enterprise InfluxDB Bridge"},
{vsn, "0.2.3"}, {vsn, "0.2.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -16,6 +16,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -70,6 +71,8 @@
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% resource callback %% resource callback
resource_type() -> influxdb.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
on_add_channel( on_add_channel(

View File

@ -86,7 +86,6 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
state := #{client := #{pool := ReturnedPoolName}} = State, state := #{client := #{pool := ReturnedPoolName}} = State,
status := InitialStatus status := InitialStatus
}} = emqx_resource:create_local( }} = emqx_resource:create_local(
influxdb,
PoolName, PoolName,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?INFLUXDB_RESOURCE_MOD, ?INFLUXDB_RESOURCE_MOD,
@ -198,7 +197,6 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) ->
config := #{ssl := #{enable := SslEnabled}}, config := #{ssl := #{enable := SslEnabled}},
status := Status status := Status
}} = emqx_resource:create_local( }} = emqx_resource:create_local(
influxdb,
PoolName, PoolName,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?INFLUXDB_RESOURCE_MOD, ?INFLUXDB_RESOURCE_MOD,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_iotdb, [ {application, emqx_bridge_iotdb, [
{description, "EMQX Enterprise Apache IoTDB Bridge"}, {description, "EMQX Enterprise Apache IoTDB Bridge"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{modules, [ {modules, [
emqx_bridge_iotdb, emqx_bridge_iotdb,
emqx_bridge_iotdb_connector emqx_bridge_iotdb_connector

View File

@ -15,6 +15,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -206,6 +207,8 @@ proplists_without(Keys, List) ->
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
resource_type() -> iotdb.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
-spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). -spec on_start(manager_id(), config()) -> {ok, state()} | no_return().

View File

@ -7,6 +7,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
query_mode/1, query_mode/1,
on_start/2, on_start/2,
@ -125,6 +126,7 @@
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
resource_type() -> kafka_consumer.
callback_mode() -> callback_mode() ->
async_if_possible. async_if_possible.

View File

@ -10,6 +10,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
query_mode/1, query_mode/1,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
@ -35,6 +36,8 @@
-define(kafka_client_id, kafka_client_id). -define(kafka_client_id, kafka_client_id).
-define(kafka_producers, kafka_producers). -define(kafka_producers, kafka_producers).
resource_type() -> kafka_producer.
query_mode(#{parameters := #{query_mode := sync}}) -> query_mode(#{parameters := #{query_mode := sync}}) ->
simple_sync_internal_buffer; simple_sync_internal_buffer;
query_mode(_) -> query_mode(_) ->

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_kinesis, [ {application, emqx_bridge_kinesis, [
{description, "EMQX Enterprise Amazon Kinesis Bridge"}, {description, "EMQX Enterprise Amazon Kinesis Bridge"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -30,6 +30,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -50,6 +51,7 @@
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
resource_type() -> kinesis_producer.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_mongodb, [ {application, emqx_bridge_mongodb, [
{description, "EMQX Enterprise MongoDB Bridge"}, {description, "EMQX Enterprise MongoDB Bridge"},
{vsn, "0.3.2"}, {vsn, "0.3.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -11,6 +11,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
on_remove_channel/3, on_remove_channel/3,
resource_type/0,
callback_mode/0, callback_mode/0,
on_add_channel/4, on_add_channel/4,
on_get_channel_status/3, on_get_channel_status/3,
@ -25,6 +26,7 @@
%%======================================================================================== %%========================================================================================
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
resource_type() -> emqx_mongodb:resource_type().
callback_mode() -> emqx_mongodb:callback_mode(). callback_mode() -> emqx_mongodb:callback_mode().

View File

@ -30,6 +30,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -76,6 +77,8 @@ on_message_received(Msg, HookPoints, ResId) ->
ok. ok.
%% =================================================================== %% ===================================================================
resource_type() -> mqtt.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
on_start(ResourceId, #{server := Server} = Conf) -> on_start(ResourceId, #{server := Server} = Conf) ->

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_mysql, [ {application, emqx_bridge_mysql, [
{description, "EMQX Enterprise MySQL Bridge"}, {description, "EMQX Enterprise MySQL Bridge"},
{vsn, "0.1.7"}, {vsn, "0.1.8"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -10,6 +10,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
on_remove_channel/3, on_remove_channel/3,
resource_type/0,
callback_mode/0, callback_mode/0,
on_add_channel/4, on_add_channel/4,
on_batch_query/3, on_batch_query/3,
@ -24,6 +25,7 @@
%%======================================================================================== %%========================================================================================
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
resource_type() -> emqx_mysql:resource_type().
callback_mode() -> emqx_mysql:callback_mode(). callback_mode() -> emqx_mysql:callback_mode().

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_opents, [ {application, emqx_bridge_opents, [
{description, "EMQX Enterprise OpenTSDB Bridge"}, {description, "EMQX Enterprise OpenTSDB Bridge"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -18,6 +18,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -114,6 +115,8 @@ connector_example_values() ->
-define(HTTP_CONNECT_TIMEOUT, 1000). -define(HTTP_CONNECT_TIMEOUT, 1000).
resource_type() -> opents.
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start( on_start(

View File

@ -10,6 +10,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
query_mode/1, query_mode/1,
on_start/2, on_start/2,
@ -55,6 +56,7 @@
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
resource_type() -> pulsar.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rabbitmq, [ {application, emqx_bridge_rabbitmq, [
{description, "EMQX Enterprise RabbitMQ Bridge"}, {description, "EMQX Enterprise RabbitMQ Bridge"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{mod, {emqx_bridge_rabbitmq_app, []}}, {mod, {emqx_bridge_rabbitmq_app, []}},
{applications, [ {applications, [

View File

@ -31,6 +31,7 @@
on_remove_channel/3, on_remove_channel/3,
on_get_channels/1, on_get_channels/1,
on_stop/2, on_stop/2,
resource_type/0,
callback_mode/0, callback_mode/0,
on_get_status/2, on_get_status/2,
on_get_channel_status/3, on_get_channel_status/3,
@ -60,6 +61,7 @@ fields(config) ->
%% =================================================================== %% ===================================================================
%% emqx_resource callback %% emqx_resource callback
resource_type() -> rabbitmq.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -135,7 +135,6 @@ check_config(Config) ->
create_local_resource(ResourceID, CheckedConfig) -> create_local_resource(ResourceID, CheckedConfig) ->
{ok, Bridge} = emqx_resource:create_local( {ok, Bridge} = emqx_resource:create_local(
rabbitmq,
ResourceID, ResourceID,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
emqx_bridge_rabbitmq_connector, emqx_bridge_rabbitmq_connector,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_redis, [ {application, emqx_bridge_redis, [
{description, "EMQX Enterprise Redis Bridge"}, {description, "EMQX Enterprise Redis Bridge"},
{vsn, "0.1.8"}, {vsn, "0.1.9"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -12,6 +12,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_add_channel/4, on_add_channel/4,
on_remove_channel/3, on_remove_channel/3,
@ -29,7 +30,9 @@
%% resource callbacks %% resource callbacks
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
callback_mode() -> always_sync. resource_type() -> emqx_redis:resource_type().
callback_mode() -> emqx_redis:callback_mode().
on_add_channel( on_add_channel(
_InstanceId, _InstanceId,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rocketmq, [ {application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"}, {description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]}, {applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, [ {env, [

View File

@ -16,6 +16,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -90,6 +91,8 @@ servers() ->
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
resource_type() -> rocketmq.
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start( on_start(

View File

@ -13,6 +13,7 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -92,6 +93,8 @@
-define(AGGREG_SUP, emqx_bridge_s3_sup). -define(AGGREG_SUP, emqx_bridge_s3_sup).
%% %%
-spec resource_type() -> resource_type().
resource_type() -> s3.
-spec callback_mode() -> callback_mode(). -spec callback_mode() -> callback_mode().
callback_mode() -> callback_mode() ->

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_sqlserver, [ {application, emqx_bridge_sqlserver, [
{description, "EMQX Enterprise SQL Server Bridge"}, {description, "EMQX Enterprise SQL Server Bridge"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, odbc]}, {applications, [kernel, stdlib, emqx_resource, odbc]},
{env, [ {env, [

View File

@ -30,6 +30,7 @@
%% callbacks for behaviour emqx_resource %% callbacks for behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -173,6 +174,7 @@ server() ->
%%==================================================================== %%====================================================================
%% Callbacks defined in emqx_resource %% Callbacks defined in emqx_resource
%%==================================================================== %%====================================================================
resource_type() -> sqlserver.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_syskeeper, [ {application, emqx_bridge_syskeeper, [
{description, "EMQX Enterprise Data bridge for Syskeeper"}, {description, "EMQX Enterprise Data bridge for Syskeeper"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -18,6 +18,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
query_mode/1, query_mode/1,
on_start/2, on_start/2,
@ -147,6 +148,7 @@ server() ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
resource_type() -> syskeeper.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -12,6 +12,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
query_mode/1, query_mode/1,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -40,6 +41,8 @@
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% emqx_resource %% emqx_resource
resource_type() ->
syskeeper_proxy_server.
query_mode(_) -> query_mode(_) ->
no_queries. no_queries.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_tdengine, [ {application, emqx_bridge_tdengine, [
{description, "EMQX Enterprise TDEngine Bridge"}, {description, "EMQX Enterprise TDEngine Bridge"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -19,6 +19,7 @@
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -140,6 +141,7 @@ connector_example_values() ->
%%======================================================================================== %%========================================================================================
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
resource_type() -> tdengine.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -126,7 +126,6 @@ create(Type, Name, Conf0, Opts) ->
ResourceId = resource_id(Type, Name), ResourceId = resource_id(Type, Name),
Conf = Conf0#{connector_type => TypeBin, connector_name => Name}, Conf = Conf0#{connector_type => TypeBin, connector_name => Name},
{ok, _Data} = emqx_resource:create_local( {ok, _Data} = emqx_resource:create_local(
Type,
ResourceId, ResourceId,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?MODULE:connector_to_resource_type(Type), ?MODULE:connector_to_resource_type(Type),
@ -200,7 +199,6 @@ recreate(Type, Name, Conf) ->
recreate(Type, Name, Conf, Opts) -> recreate(Type, Name, Conf, Opts) ->
TypeBin = bin(Type), TypeBin = bin(Type),
emqx_resource:recreate_local( emqx_resource:recreate_local(
Type,
resource_id(Type, Name), resource_id(Type, Name),
?MODULE:connector_to_resource_type(Type), ?MODULE:connector_to_resource_type(Type),
parse_confs(TypeBin, Name, Conf), parse_confs(TypeBin, Name, Conf),
@ -237,7 +235,7 @@ create_dry_run(Type, Conf0, Callback) ->
{ok, ConfNew} -> {ok, ConfNew} ->
ParseConf = parse_confs(bin(Type), TmpName, ConfNew), ParseConf = parse_confs(bin(Type), TmpName, ConfNew),
emqx_resource:create_dry_run_local( emqx_resource:create_dry_run_local(
Type, TmpName, ?MODULE:connector_to_resource_type(Type), ParseConf, Callback TmpName, ?MODULE:connector_to_resource_type(Type), ParseConf, Callback
) )
end end
catch catch

View File

@ -50,6 +50,7 @@ t_connector_lifecycle({init, Config}) ->
meck:new(emqx_connector_resource, [passthrough]), meck:new(emqx_connector_resource, [passthrough]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]), meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, resource_type, 0, dummy),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok), meck:expect(?CONNECTOR, on_stop, 2, ok),
@ -171,6 +172,7 @@ t_remove_fail({'init', Config}) ->
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]), meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, resource_type, 0, dummy),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_get_channels, 1, [{<<"my_channel">>, #{enable => true}}]), meck:expect(?CONNECTOR, on_get_channels, 1, [{<<"my_channel">>, #{enable => true}}]),
meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}), meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}),
@ -234,6 +236,7 @@ t_create_with_bad_name_direct_path({init, Config}) ->
meck:new(emqx_connector_resource, [passthrough]), meck:new(emqx_connector_resource, [passthrough]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]), meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, resource_type, 0, dummy),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok), meck:expect(?CONNECTOR, on_stop, 2, ok),
@ -265,6 +268,7 @@ t_create_with_bad_name_root_path({init, Config}) ->
meck:new(emqx_connector_resource, [passthrough]), meck:new(emqx_connector_resource, [passthrough]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]), meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, resource_type, 0, dummy),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok), meck:expect(?CONNECTOR, on_stop, 2, ok),
@ -299,6 +303,7 @@ t_no_buffer_workers({'init', Config}) ->
meck:new(emqx_connector_resource, [passthrough]), meck:new(emqx_connector_resource, [passthrough]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]), meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, resource_type, 0, dummy),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_get_channels, 1, []), meck:expect(?CONNECTOR, on_get_channels, 1, []),

View File

@ -224,6 +224,7 @@ init_mocks(_TestCase) ->
meck:new(emqx_connector_resource, [passthrough, no_link]), meck:new(emqx_connector_resource, [passthrough, no_link]),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL),
meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]),
meck:expect(?CONNECTOR_IMPL, resource_type, 0, dummy),
meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible),
meck:expect( meck:expect(
?CONNECTOR_IMPL, ?CONNECTOR_IMPL,

View File

@ -15,8 +15,10 @@
%% this module is only intended to be mocked %% this module is only intended to be mocked
-module(emqx_connector_dummy_impl). -module(emqx_connector_dummy_impl).
-behavior(emqx_resource).
-export([ -export([
resource_type/0,
query_mode/1, query_mode/1,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
@ -25,6 +27,7 @@
on_get_channel_status/3 on_get_channel_status/3
]). ]).
resource_type() -> dummy.
query_mode(_) -> error(unexpected). query_mode(_) -> error(unexpected).
callback_mode() -> error(unexpected). callback_mode() -> error(unexpected).
on_start(_, _) -> error(unexpected). on_start(_, _) -> error(unexpected).

View File

@ -132,7 +132,6 @@ make_resource_id(Backend) ->
create_resource(ResourceId, Module, Config) -> create_resource(ResourceId, Module, Config) ->
Result = emqx_resource:create_local( Result = emqx_resource:create_local(
dashboard_sso,
ResourceId, ResourceId,
?RESOURCE_GROUP, ?RESOURCE_GROUP,
Module, Module,
@ -143,7 +142,7 @@ create_resource(ResourceId, Module, Config) ->
update_resource(ResourceId, Module, Config) -> update_resource(ResourceId, Module, Config) ->
Result = emqx_resource:recreate_local( Result = emqx_resource:recreate_local(
dashboard_sso, ResourceId, Module, Config, ?DEFAULT_RESOURCE_OPTS ResourceId, Module, Config, ?DEFAULT_RESOURCE_OPTS
), ),
start_resource_if_enabled(ResourceId, Result, Config). start_resource_if_enabled(ResourceId, Result, Config).

View File

@ -1,6 +1,6 @@
{application, emqx_ldap, [ {application, emqx_ldap, [
{description, "EMQX LDAP Connector"}, {description, "EMQX LDAP Connector"},
{vsn, "0.1.8"}, {vsn, "0.1.9"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -27,6 +27,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -129,6 +130,8 @@ ensure_username(Field) ->
emqx_connector_schema_lib:username(Field). emqx_connector_schema_lib:username(Field).
%% =================================================================== %% ===================================================================
resource_type() -> ldap.
callback_mode() -> always_sync. callback_mode() -> always_sync.
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.

View File

@ -96,7 +96,6 @@ perform_lifecycle_check(ResourceId, InitialConfig) ->
state := #{pool_name := PoolName} = State, state := #{pool_name := PoolName} = State,
status := InitialStatus status := InitialStatus
}} = emqx_resource:create_local( }} = emqx_resource:create_local(
ldap,
ResourceId, ResourceId,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?LDAP_RESOURCE_MOD, ?LDAP_RESOURCE_MOD,
@ -172,7 +171,6 @@ t_get_status(Config) ->
?LDAP_RESOURCE_MOD, ldap_config(Config) ?LDAP_RESOURCE_MOD, ldap_config(Config)
), ),
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
ldap,
ResourceId, ResourceId,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?LDAP_RESOURCE_MOD, ?LDAP_RESOURCE_MOD,

View File

@ -1,6 +1,6 @@
{application, emqx_mongodb, [ {application, emqx_mongodb, [
{description, "EMQX MongoDB Connector"}, {description, "EMQX MongoDB Connector"},
{vsn, "0.1.6"}, {vsn, "0.1.7"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -26,6 +26,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -172,6 +173,7 @@ desc(_) ->
undefined. undefined.
%% =================================================================== %% ===================================================================
resource_type() -> mongodb.
callback_mode() -> always_sync. callback_mode() -> always_sync.

View File

@ -143,7 +143,6 @@ check_config(Config) ->
create_local_resource(ResourceId, CheckedConfig) -> create_local_resource(ResourceId, CheckedConfig) ->
{ok, Bridge} = emqx_resource:create_local( {ok, Bridge} = emqx_resource:create_local(
mongodb,
ResourceId, ResourceId,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?MONGO_RESOURCE_MOD, ?MONGO_RESOURCE_MOD,

View File

@ -1,6 +1,6 @@
{application, emqx_mysql, [ {application, emqx_mysql, [
{description, "EMQX MySQL Database Connector"}, {description, "EMQX MySQL Database Connector"},
{vsn, "0.1.9"}, {vsn, "0.2.0"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -25,6 +25,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -91,6 +92,8 @@ server() ->
emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS). emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS).
%% =================================================================== %% ===================================================================
resource_type() -> mysql.
callback_mode() -> always_sync. callback_mode() -> always_sync.
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.

View File

@ -67,7 +67,6 @@ perform_lifecycle_check(ResourceId, InitialConfig) ->
state := #{pool_name := PoolName} = State, state := #{pool_name := PoolName} = State,
status := InitialStatus status := InitialStatus
}} = emqx_resource:create_local( }} = emqx_resource:create_local(
mysql,
ResourceId, ResourceId,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?MYSQL_RESOURCE_MOD, ?MYSQL_RESOURCE_MOD,

View File

@ -1,6 +1,6 @@
{application, emqx_oracle, [ {application, emqx_oracle, [
{description, "EMQX Enterprise Oracle Database Connector"}, {description, "EMQX Enterprise Oracle Database Connector"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -21,6 +21,7 @@
%% callbacks for behaviour emqx_resource %% callbacks for behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -67,6 +68,8 @@
batch_params_tokens := params_tokens() batch_params_tokens := params_tokens()
}. }.
resource_type() -> oracle.
% As ecpool is not monitoring the worker's PID when doing a handover_async, the % As ecpool is not monitoring the worker's PID when doing a handover_async, the
% request can be lost if worker crashes. Thus, it's better to force requests to % request can be lost if worker crashes. Thus, it's better to force requests to
% be sync for now. % be sync for now.

View File

@ -1,6 +1,6 @@
{application, emqx_postgresql, [ {application, emqx_postgresql, [
{description, "EMQX PostgreSQL Database Connector"}, {description, "EMQX PostgreSQL Database Connector"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -29,6 +29,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -120,6 +121,8 @@ adjust_fields(Fields) ->
). ).
%% =================================================================== %% ===================================================================
resource_type() -> postgresql.
callback_mode() -> always_sync. callback_mode() -> always_sync.
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.

View File

@ -75,7 +75,6 @@ perform_lifecycle_check(ResourceId, InitialConfig) ->
status := InitialStatus status := InitialStatus
}} = }} =
emqx_resource:create_local( emqx_resource:create_local(
postgresql,
ResourceId, ResourceId,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?PGSQL_RESOURCE_MOD, ?PGSQL_RESOURCE_MOD,

View File

@ -28,6 +28,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
resource_type/0,
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
@ -119,6 +120,8 @@ redis_type(Type) ->
desc => ?DESC(Type) desc => ?DESC(Type)
}}. }}.
resource_type() -> redis.
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start(InstId, Config0) -> on_start(InstId, Config0) ->

Some files were not shown because too many files have changed in this diff Show More