From e7ffa07a1a70e13eeb292c7a052764a18871d95b Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 31 May 2021 22:49:42 +0800 Subject: [PATCH] feat(emqx_connector): load connectors from emqx_connector.conf --- apps/emqx_connector/etc/emqx_connector.conf | 16 ++++ apps/emqx_connector/src/emqx_connector.erl | 1 + .../emqx_connector/src/emqx_connector_app.erl | 17 +++++ apps/emqx_resource/src/emqx_resource.erl | 22 ++++-- .../src/emqx_resource_instance.erl | 75 ++++++++++++------- 5 files changed, 96 insertions(+), 35 deletions(-) create mode 100644 apps/emqx_connector/src/emqx_connector.erl diff --git a/apps/emqx_connector/etc/emqx_connector.conf b/apps/emqx_connector/etc/emqx_connector.conf index 54b57ebe6..d93db8c34 100644 --- a/apps/emqx_connector/etc/emqx_connector.conf +++ b/apps/emqx_connector/etc/emqx_connector.conf @@ -1,3 +1,19 @@ ##-------------------------------------------------------------------- ## EMQ X CONNECTOR Plugin ##-------------------------------------------------------------------- + +## Base directory for emqx_connector indicating where to load configs from disk. +## +## Value: String +## Default: "{{ etc_dir }}/connectors/" +emqx_connectors: [ + {id: "mysql-abc", + type: mysql_connenctor, + config: {} + }, + {id: "pgsql-123", + type: pgsql_connenctor, + config: {} + }, +] + diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl new file mode 100644 index 000000000..ccb88c81a --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -0,0 +1 @@ +-module(emqx_connector). diff --git a/apps/emqx_connector/src/emqx_connector_app.erl b/apps/emqx_connector/src/emqx_connector_app.erl index 402e94187..0e64d11bc 100644 --- a/apps/emqx_connector/src/emqx_connector_app.erl +++ b/apps/emqx_connector/src/emqx_connector_app.erl @@ -12,9 +12,26 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> + load_config(), emqx_connector_sup:start_link(). stop(_State) -> ok. %% internal functions + +load_config() -> + case hocon:load("etc/plugins/emqx_connector.conf", #{format => map}) of + {ok, #{<<"emqx_connectors">> := Connectors}} -> + lists:foreach(fun load_connector/1, Connectors); + {error, Reason} -> + error(Reason) + end. + +load_connector(Config) -> + case emqx_resource:load_instance_from_config(Config) of + {ok, _} -> ok; + {error, already_created} -> ok; + {error, Reason} -> + error({load_connector, Reason}) + end. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b9107d328..00ea1f868 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -71,7 +71,9 @@ , list_instances_verbose/0 %% list all the instances , get_instance/1 %% return the data of the instance , get_instance_by_type/1 %% return all the instances of the same resource type - , load_instances/1 %% load instances from config files + , load_instances_from_dir/1 %% load instances from a directory + , load_instance_from_file/1 %% load an instance from a config file + , load_instance_from_config/1 %% load an instance from a map or json-string config % , dependents/1 % , inc_counter/2 %% increment the counter of the instance % , inc_counter/3 %% increment the counter by a given integer @@ -208,9 +210,17 @@ list_instances_verbose() -> get_instance_by_type(ResourceType) -> emqx_resource_instance:lookup_by_type(ResourceType). --spec load_instances(Dir :: string()) -> ok. -load_instances(Dir) -> - emqx_resource_instance:load(Dir). +-spec load_instances_from_dir(Dir :: string()) -> ok. +load_instances_from_dir(Dir) -> + emqx_resource_instance:load_dir(Dir). + +-spec load_instance_from_file(File :: string()) -> ok. +load_instance_from_file(File) -> + emqx_resource_instance:load_file(File). + +-spec load_instance_from_config(binary() | map()) -> ok. +load_instance_from_config(Config) -> + emqx_resource_instance:load_config(Config). -spec call_start(instance_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. @@ -240,7 +250,7 @@ parse_config(ResourceType, RawConfig) when is_binary(RawConfig) -> Error -> Error end; parse_config(ResourceType, RawConfigTerm) -> - parse_config(ResourceType, jsx:encode(#{<<"config">> => RawConfigTerm})). + parse_config(ResourceType, jsx:encode(#{config => RawConfigTerm})). -spec do_parse_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}. do_parse_config(ResourceType, MapConfig) -> @@ -261,7 +271,7 @@ resource_type_from_str(ResourceType) -> false -> {error, {invalid_resource, Mod}} end catch error:badarg -> - {error, {not_found, ResourceType}} + {error, {resourec_not_found, ResourceType}} end. call_instance(InstId, Query) -> diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index ca5e4829e..3a750ebf3 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -23,10 +23,13 @@ -export([start_link/2]). %% load resource instances from *.conf files --export([ load/1 +-export([ load_dir/1 + , load_file/1 + , load_config/1 , lookup/1 , list_all/0 , lookup_by_type/1 + , create_local/3 ]). -export([ hash_call/2 @@ -82,8 +85,8 @@ lookup_by_type(ResourceType) -> [Data || #{mod := Mod} = Data <- list_all() , Mod =:= ResourceType]. --spec load(Dir :: string()) -> ok. -load(Dir) -> +-spec load_dir(Dir :: string()) -> ok. +load_dir(Dir) -> lists:foreach(fun load_file/1, filelib:wildcard(filename:join([Dir, "*.conf"]))). load_file(File) -> @@ -91,40 +94,48 @@ load_file(File) -> {error, Reason} -> logger:error("load resource from ~p failed: ~p", [File, Reason]); RawConfig -> - case hocon:binary(RawConfig, #{format => map}) of - {ok, #{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr, - <<"config">> := MapConfig}} -> - case emqx_resource:resource_type_from_str(ResourceTypeStr) of - {ok, ResourceType} -> - parse_and_load_config(Id, ResourceType, MapConfig); - {error, Reason} -> - logger:error("no such resource type: ~s, ~p", - [ResourceTypeStr, Reason]) - end; + case load_config(RawConfig) of + {ok, Data} -> + logger:debug("loaded resource instance from file: ~p, data: ~p", + [File, Data]); {error, Reason} -> logger:error("load resource from ~p failed: ~p", [File, Reason]) end end. -parse_and_load_config(InstId, ResourceType, MapConfig) -> - case emqx_resource:parse_config(ResourceType, MapConfig) of - {error, Reason} -> - logger:error("parse config for resource ~p of type ~p failed: ~p", - [InstId, ResourceType, Reason]); - {ok, InstConf} -> - create_instance_local(InstId, ResourceType, InstConf) +-spec load_config(binary() | map()) -> {ok, resource_data()} | {error, term()}. +load_config(RawConfig) when is_binary(RawConfig) -> + case hocon:binary(RawConfig, #{format => map}) of + {ok, ConfigTerm} -> load_config(ConfigTerm); + Error -> Error + end; + +load_config(#{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr, + <<"config">> := MapConfig}) -> + case emqx_resource:resource_type_from_str(ResourceTypeStr) of + {ok, ResourceType} -> parse_and_load_config(Id, ResourceType, MapConfig); + Error -> Error end. -create_instance_local(InstId, ResourceType, InstConf) -> - case do_create(InstId, ResourceType, InstConf) of - {ok, Data} -> - logger:debug("created ~p resource instance: ~p from config: ~p, Data: ~p", - [ResourceType, InstId, InstConf, Data]); - {error, Reason} -> - logger:error("create ~p resource instance: ~p failed: ~p, config: ~p", - [ResourceType, InstId, Reason, InstConf]) +parse_and_load_config(InstId, ResourceType, MapConfig) -> + case emqx_resource:parse_config(ResourceType, MapConfig) of + {ok, InstConf} -> create_local(InstId, ResourceType, InstConf); + Error -> Error end. +create_local(InstId, ResourceType, InstConf) -> + case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of + {ok, Data} -> {ok, Data}; + Error -> Error + end. + +save_config_to_disk(InstId, ResourceType, Config) -> + file:write_file(filename:join([emqx_data_dir(), binary_to_list(InstId) ++ ".conf"]), + jsx:encode(#{id => InstId, resource_type => ResourceType, config => Config})). + +emqx_data_dir() -> + "data". + %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ @@ -205,7 +216,13 @@ do_create(InstId, ResourceType, Config) -> #{mod => ResourceType, config => Config, state => ResourceState, status => stopped}}), _ = do_health_check(InstId), - {ok, force_lookup(InstId)}; + case save_config_to_disk(InstId, ResourceType, Config) of + ok -> {ok, force_lookup(InstId)}; + {error, Reason} -> + logger:error("save config for ~p resource ~p to disk failed: ~p", + [ResourceType, InstId, Reason]), + {error, Reason} + end; {error, Reason} -> logger:error("start ~s resource ~s failed: ~p", [ResourceType, InstId, Reason]), {error, Reason}