feat(emqx_connector): load connectors from emqx_connector.conf

This commit is contained in:
Shawn 2021-05-31 22:49:42 +08:00
parent f4bb589079
commit e7ffa07a1a
5 changed files with 96 additions and 35 deletions

View File

@ -1,3 +1,19 @@
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## EMQ X CONNECTOR Plugin ## EMQ X CONNECTOR Plugin
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Base directory for emqx_connector indicating where to load configs from disk.
##
## Value: String
## Default: "{{ etc_dir }}/connectors/"
emqx_connectors: [
{id: "mysql-abc",
type: mysql_connenctor,
config: {}
},
{id: "pgsql-123",
type: pgsql_connenctor,
config: {}
},
]

View File

@ -0,0 +1 @@
-module(emqx_connector).

View File

@ -12,9 +12,26 @@
-export([start/2, stop/1]). -export([start/2, stop/1]).
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
load_config(),
emqx_connector_sup:start_link(). emqx_connector_sup:start_link().
stop(_State) -> stop(_State) ->
ok. ok.
%% internal functions %% internal functions
load_config() ->
case hocon:load("etc/plugins/emqx_connector.conf", #{format => map}) of
{ok, #{<<"emqx_connectors">> := Connectors}} ->
lists:foreach(fun load_connector/1, Connectors);
{error, Reason} ->
error(Reason)
end.
load_connector(Config) ->
case emqx_resource:load_instance_from_config(Config) of
{ok, _} -> ok;
{error, already_created} -> ok;
{error, Reason} ->
error({load_connector, Reason})
end.

View File

@ -71,7 +71,9 @@
, list_instances_verbose/0 %% list all the instances , list_instances_verbose/0 %% list all the instances
, get_instance/1 %% return the data of the instance , get_instance/1 %% return the data of the instance
, get_instance_by_type/1 %% return all the instances of the same resource type , get_instance_by_type/1 %% return all the instances of the same resource type
, load_instances/1 %% load instances from config files , load_instances_from_dir/1 %% load instances from a directory
, load_instance_from_file/1 %% load an instance from a config file
, load_instance_from_config/1 %% load an instance from a map or json-string config
% , dependents/1 % , dependents/1
% , inc_counter/2 %% increment the counter of the instance % , inc_counter/2 %% increment the counter of the instance
% , inc_counter/3 %% increment the counter by a given integer % , inc_counter/3 %% increment the counter by a given integer
@ -208,9 +210,17 @@ list_instances_verbose() ->
get_instance_by_type(ResourceType) -> get_instance_by_type(ResourceType) ->
emqx_resource_instance:lookup_by_type(ResourceType). emqx_resource_instance:lookup_by_type(ResourceType).
-spec load_instances(Dir :: string()) -> ok. -spec load_instances_from_dir(Dir :: string()) -> ok.
load_instances(Dir) -> load_instances_from_dir(Dir) ->
emqx_resource_instance:load(Dir). emqx_resource_instance:load_dir(Dir).
-spec load_instance_from_file(File :: string()) -> ok.
load_instance_from_file(File) ->
emqx_resource_instance:load_file(File).
-spec load_instance_from_config(binary() | map()) -> ok.
load_instance_from_config(Config) ->
emqx_resource_instance:load_config(Config).
-spec call_start(instance_id(), module(), resource_config()) -> -spec call_start(instance_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}. {ok, resource_state()} | {error, Reason :: term()}.
@ -240,7 +250,7 @@ parse_config(ResourceType, RawConfig) when is_binary(RawConfig) ->
Error -> Error Error -> Error
end; end;
parse_config(ResourceType, RawConfigTerm) -> parse_config(ResourceType, RawConfigTerm) ->
parse_config(ResourceType, jsx:encode(#{<<"config">> => RawConfigTerm})). parse_config(ResourceType, jsx:encode(#{config => RawConfigTerm})).
-spec do_parse_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}. -spec do_parse_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}.
do_parse_config(ResourceType, MapConfig) -> do_parse_config(ResourceType, MapConfig) ->
@ -261,7 +271,7 @@ resource_type_from_str(ResourceType) ->
false -> {error, {invalid_resource, Mod}} false -> {error, {invalid_resource, Mod}}
end end
catch error:badarg -> catch error:badarg ->
{error, {not_found, ResourceType}} {error, {resourec_not_found, ResourceType}}
end. end.
call_instance(InstId, Query) -> call_instance(InstId, Query) ->

View File

@ -23,10 +23,13 @@
-export([start_link/2]). -export([start_link/2]).
%% load resource instances from *.conf files %% load resource instances from *.conf files
-export([ load/1 -export([ load_dir/1
, load_file/1
, load_config/1
, lookup/1 , lookup/1
, list_all/0 , list_all/0
, lookup_by_type/1 , lookup_by_type/1
, create_local/3
]). ]).
-export([ hash_call/2 -export([ hash_call/2
@ -82,8 +85,8 @@ lookup_by_type(ResourceType) ->
[Data || #{mod := Mod} = Data <- list_all() [Data || #{mod := Mod} = Data <- list_all()
, Mod =:= ResourceType]. , Mod =:= ResourceType].
-spec load(Dir :: string()) -> ok. -spec load_dir(Dir :: string()) -> ok.
load(Dir) -> load_dir(Dir) ->
lists:foreach(fun load_file/1, filelib:wildcard(filename:join([Dir, "*.conf"]))). lists:foreach(fun load_file/1, filelib:wildcard(filename:join([Dir, "*.conf"]))).
load_file(File) -> load_file(File) ->
@ -91,40 +94,48 @@ load_file(File) ->
{error, Reason} -> {error, Reason} ->
logger:error("load resource from ~p failed: ~p", [File, Reason]); logger:error("load resource from ~p failed: ~p", [File, Reason]);
RawConfig -> RawConfig ->
case hocon:binary(RawConfig, #{format => map}) of case load_config(RawConfig) of
{ok, #{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr, {ok, Data} ->
<<"config">> := MapConfig}} -> logger:debug("loaded resource instance from file: ~p, data: ~p",
case emqx_resource:resource_type_from_str(ResourceTypeStr) of [File, Data]);
{ok, ResourceType} ->
parse_and_load_config(Id, ResourceType, MapConfig);
{error, Reason} ->
logger:error("no such resource type: ~s, ~p",
[ResourceTypeStr, Reason])
end;
{error, Reason} -> {error, Reason} ->
logger:error("load resource from ~p failed: ~p", [File, Reason]) logger:error("load resource from ~p failed: ~p", [File, Reason])
end end
end. end.
parse_and_load_config(InstId, ResourceType, MapConfig) -> -spec load_config(binary() | map()) -> {ok, resource_data()} | {error, term()}.
case emqx_resource:parse_config(ResourceType, MapConfig) of load_config(RawConfig) when is_binary(RawConfig) ->
{error, Reason} -> case hocon:binary(RawConfig, #{format => map}) of
logger:error("parse config for resource ~p of type ~p failed: ~p", {ok, ConfigTerm} -> load_config(ConfigTerm);
[InstId, ResourceType, Reason]); Error -> Error
{ok, InstConf} -> end;
create_instance_local(InstId, ResourceType, InstConf)
load_config(#{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr,
<<"config">> := MapConfig}) ->
case emqx_resource:resource_type_from_str(ResourceTypeStr) of
{ok, ResourceType} -> parse_and_load_config(Id, ResourceType, MapConfig);
Error -> Error
end. end.
create_instance_local(InstId, ResourceType, InstConf) -> parse_and_load_config(InstId, ResourceType, MapConfig) ->
case do_create(InstId, ResourceType, InstConf) of case emqx_resource:parse_config(ResourceType, MapConfig) of
{ok, Data} -> {ok, InstConf} -> create_local(InstId, ResourceType, InstConf);
logger:debug("created ~p resource instance: ~p from config: ~p, Data: ~p", Error -> Error
[ResourceType, InstId, InstConf, Data]);
{error, Reason} ->
logger:error("create ~p resource instance: ~p failed: ~p, config: ~p",
[ResourceType, InstId, Reason, InstConf])
end. end.
create_local(InstId, ResourceType, InstConf) ->
case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of
{ok, Data} -> {ok, Data};
Error -> Error
end.
save_config_to_disk(InstId, ResourceType, Config) ->
file:write_file(filename:join([emqx_data_dir(), binary_to_list(InstId) ++ ".conf"]),
jsx:encode(#{id => InstId, resource_type => ResourceType, config => Config})).
emqx_data_dir() ->
"data".
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -205,7 +216,13 @@ do_create(InstId, ResourceType, Config) ->
#{mod => ResourceType, config => Config, #{mod => ResourceType, config => Config,
state => ResourceState, status => stopped}}), state => ResourceState, status => stopped}}),
_ = do_health_check(InstId), _ = do_health_check(InstId),
{ok, force_lookup(InstId)}; case save_config_to_disk(InstId, ResourceType, Config) of
ok -> {ok, force_lookup(InstId)};
{error, Reason} ->
logger:error("save config for ~p resource ~p to disk failed: ~p",
[ResourceType, InstId, Reason]),
{error, Reason}
end;
{error, Reason} -> {error, Reason} ->
logger:error("start ~s resource ~s failed: ~p", [ResourceType, InstId, Reason]), logger:error("start ~s resource ~s failed: ~p", [ResourceType, InstId, Reason]),
{error, Reason} {error, Reason}