diff --git a/apps/emqx_connector/README.md b/apps/emqx_connector/README.md index 845da77d0..879669f93 100644 --- a/apps/emqx_connector/README.md +++ b/apps/emqx_connector/README.md @@ -1,8 +1,12 @@ # emqx_connector -A connector is an object that maintains the data related to external resources. +This application is a collection of `connectors`. -For example, a mysql connector is an object that maintains all the mysql connection +A `connector` is a callback module of `emqx_resource` that maintains the data related to +external resources. Put all resource related callback modules in a single application is good as +we can put some util functions/modules here for reusing purpose. + +For example, a mysql connector is an emqx resource that maintains all the mysql connection related parameters (configs) and the TCP connections to the mysql server. An mysql connector can be used as following: diff --git a/apps/emqx_connector/etc/emqx_connector.conf b/apps/emqx_connector/etc/emqx_connector.conf index 06f25cde4..db4402d47 100644 --- a/apps/emqx_connector/etc/emqx_connector.conf +++ b/apps/emqx_connector/etc/emqx_connector.conf @@ -2,18 +2,3 @@ ## EMQ X CONNECTOR Plugin ##-------------------------------------------------------------------- -connectors: [ - {id: "mysql-abc" - resource_type: emqx_connector_mysql - config: { - server: "127.0.0.1:3306" - database: mqtt - pool_size: 1 - user: root - password: public - auto_reconnect: true - ssl: false - } - } -] - diff --git a/apps/emqx_connector/priv/emqx_connector.schema b/apps/emqx_connector/priv/emqx_connector.schema index 8d81635d5..b8476c4d9 100644 --- a/apps/emqx_connector/priv/emqx_connector.schema +++ b/apps/emqx_connector/priv/emqx_connector.schema @@ -1,7 +1,2 @@ %%-*- mode: erlang -*- %% emqx_connector config mapping - -{mapping, "connectors", "emqx_connector.connectors", [ - {default, []}, - {datatype, string} -]}. \ No newline at end of file diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index c18106950..dd0359348 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -14,27 +14,3 @@ %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_connector). - --export([ load_from_config/1 - , load_connectors/1 - , load_connector/1 - ]). - -load_from_config(Filename) -> - case hocon:load(Filename, #{format => map}) of - {ok, #{<<"connectors">> := Connectors}} -> - load_connectors(Connectors); - {error, Reason} -> - error(Reason) - end. - -load_connectors(Connectors) -> - lists:foreach(fun load_connector/1, Connectors). - -load_connector(Config) -> - case emqx_resource:load_instance_from_config(Config) of - {ok, _} -> ok; - {error, already_created} -> ok; - {error, Reason} -> - error({load_connector, Reason}) - end. diff --git a/apps/emqx_connector/src/emqx_connector_app.erl b/apps/emqx_connector/src/emqx_connector_app.erl index af652733f..4bbad75cf 100644 --- a/apps/emqx_connector/src/emqx_connector_app.erl +++ b/apps/emqx_connector/src/emqx_connector_app.erl @@ -23,8 +23,6 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> - Connectors = proplists:get_value(connectors, application:get_all_env(emqx_connector), []), - emqx_connector:load_connectors(Connectors), emqx_connector_sup:start_link(). stop(_State) -> diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 3d22b4c99..1bc1bbc80 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -20,8 +20,7 @@ -emqx_resource_api_path("connectors/mysql"). --export([ fields/1 - , on_jsonify/1 +-export([ on_jsonify/1 , on_api_reply_format/1 ]). @@ -37,36 +36,38 @@ -export([do_health_check/1]). %%===================================================================== -fields("config") -> +schema() -> emqx_connector_schema_lib:relational_db_fields() ++ emqx_connector_schema_lib:ssl_fields(). -on_jsonify(#{server := Server, user := User, database := DB, password := Passwd, - cacertfile := CAFile, keyfile := KeyFile, certfile := CertFile} = Config) -> +on_jsonify(#{<<"server">> := Server, <<"user">> := User, <<"database">> := DB, + <<"password">> := Passwd, <<"cacertfile">> := CAFile, + <<"keyfile">> := KeyFile, <<"certfile">> := CertFile} = Config) -> Config#{ - user => list_to_binary(User), - database => list_to_binary(DB), - password => list_to_binary(Passwd), - server => emqx_connector_schema_lib:ip_port_to_string(Server), - cacertfile => list_to_binary(CAFile), - keyfile => list_to_binary(KeyFile), - certfile => list_to_binary(CertFile) + <<"user">> => list_to_binary(User), + <<"database">> => list_to_binary(DB), + <<"password">> => list_to_binary(Passwd), + <<"server">> => emqx_connector_schema_lib:ip_port_to_string(Server), + <<"cacertfile">> => list_to_binary(CAFile), + <<"keyfile">> => list_to_binary(KeyFile), + <<"certfile">> => list_to_binary(CertFile) }. on_api_reply_format(ResourceData) -> #{config := Conf} = Reply0 = emqx_resource_api:default_api_reply_format(ResourceData), - Reply0#{config => maps:without([cacertfile, keyfile, certfile, verify], Conf)}. + Reply0#{config => maps:without([<<"cacertfile">>, <<"keyfile">>, + <<"certfile">>, <<"verify">>], Conf)}. %% =================================================================== -on_start(InstId, #{server := {Host, Port}, - database := DB, - user := User, - password := Password, - auto_reconnect := AutoReconn, - pool_size := PoolSize} = Config) -> +on_start(InstId, #{<<"server">> := {Host, Port}, + <<"database">> := DB, + <<"user">> := User, + <<"password">> := Password, + <<"auto_reconnect">> := AutoReconn, + <<"pool_size">> := PoolSize} = Config) -> logger:info("starting mysql connector: ~p, config: ~p", [InstId, Config]), - SslOpts = case maps:get(ssl, Config) of + SslOpts = case maps:get(<<"ssl">>, Config) of true -> [{ssl, [{server_name_indication, disable} | emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)]}]; diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index 39bb72113..02bed956b 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -52,57 +52,47 @@ ssl_fields() -> , {verify, fun verify/1} ]. -server(mapping) -> "config.server"; server(type) -> ip_port(); server(validator) -> [?REQUIRED("the field 'server' is required")]; server(_) -> undefined. -database(mapping) -> "config.database"; database(type) -> string(); database(validator) -> [?REQUIRED("the field 'server' is required")]; database(_) -> undefined. -pool_size(mapping) -> "config.pool_size"; pool_size(type) -> integer(); pool_size(default) -> 8; pool_size(validator) -> [?MIN(1), ?MAX(64)]; pool_size(_) -> undefined. -user(mapping) -> "config.user"; user(type) -> string(); user(default) -> "root"; user(_) -> undefined. -password(mapping) -> "config.password"; password(type) -> string(); password(default) -> ""; password(_) -> undefined. -auto_reconnect(mapping) -> "config.auto_reconnect"; auto_reconnect(type) -> boolean(); auto_reconnect(default) -> true; auto_reconnect(_) -> undefined. -ssl(mapping) -> "config.ssl"; + ssl(type) -> boolean(); ssl(default) -> false; ssl(_) -> undefined. -cacertfile(mapping) -> "config.cacertfile"; cacertfile(type) -> string(); cacertfile(default) -> ""; cacertfile(_) -> undefined. -keyfile(mapping) -> "config.keyfile"; keyfile(type) -> string(); keyfile(default) -> ""; keyfile(_) -> undefined. -certfile(mapping) -> "config.certfile"; certfile(type) -> string(); certfile(default) -> ""; certfile(_) -> undefined. -verify(mapping) -> "config.verify"; verify(type) -> boolean(); verify(default) -> false; verify(_) -> undefined. diff --git a/apps/emqx_data_bridge/.gitignore b/apps/emqx_data_bridge/.gitignore new file mode 100644 index 000000000..f1c455451 --- /dev/null +++ b/apps/emqx_data_bridge/.gitignore @@ -0,0 +1,19 @@ +.rebar3 +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +_build +.idea +*.iml +rebar3.crashdump +*~ diff --git a/apps/emqx_data_bridge/README.md b/apps/emqx_data_bridge/README.md new file mode 100644 index 000000000..8f76f17a5 --- /dev/null +++ b/apps/emqx_data_bridge/README.md @@ -0,0 +1,10 @@ +# emqx_data_bridge + +EMQ X Data Bridge is an application that managing the resources (see emqx_resource) used by emqx +rule engine. + +It provides CRUD HTTP APIs of the resources, and is also responsible for loading the resources at +startup, and saving configs of resources to `data/` after configs updated. + +The application depends on `emqx_connector` as that's where all the callback modules of `connector` +resources placed. diff --git a/apps/emqx_data_bridge/etc/emqx_data_bridge.conf b/apps/emqx_data_bridge/etc/emqx_data_bridge.conf new file mode 100644 index 000000000..295173e6c --- /dev/null +++ b/apps/emqx_data_bridge/etc/emqx_data_bridge.conf @@ -0,0 +1,30 @@ +##-------------------------------------------------------------------- +## EMQ X Bridge Plugin +##-------------------------------------------------------------------- + +emqx_data_bridge.bridges: [ + {name: "mysql-abc" + type: mysql + config: { + server: "127.0.0.1:3306" + database: mqtt + pool_size: 1 + user: root + password: public + auto_reconnect: true + ssl: false + } + }, + {name: "mysql-def" + type: mysql + config: { + server: "127.0.0.1:3306" + database: mqtt + pool_size: 1 + user: root + password: public + auto_reconnect: true + ssl: false + } + } +] diff --git a/apps/emqx_data_bridge/priv/emqx_data_bridge.schema b/apps/emqx_data_bridge/priv/emqx_data_bridge.schema new file mode 100644 index 000000000..c9cb3f2c0 --- /dev/null +++ b/apps/emqx_data_bridge/priv/emqx_data_bridge.schema @@ -0,0 +1,16 @@ +%%-*- mode: erlang -*- +%% emqx_data_bridge config mapping + +{mapping, "emqx_data_bridge.bridges", "emqx_data_bridge.bridges", [ + {default, []}, + {datatype, string} +]}. + +% fields("emqx_data_bridge") -> +% [ +% {bridges, +% [fun(mapping) -> "emqx_data_bridge.bridges"; +% (type) -> list(); +% (_) -> undefined +% end]} +% ] \ No newline at end of file diff --git a/apps/emqx_data_bridge/rebar.config b/apps/emqx_data_bridge/rebar.config new file mode 100644 index 000000000..cf4cfcf1b --- /dev/null +++ b/apps/emqx_data_bridge/rebar.config @@ -0,0 +1,7 @@ +{erl_opts, [debug_info]}. +{deps, []}. + +{shell, [ + % {config, "config/sys.config"}, + {apps, [emqx_data_bridge]} +]}. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge.app.src b/apps/emqx_data_bridge/src/emqx_data_bridge.app.src new file mode 100644 index 000000000..360511d9b --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge.app.src @@ -0,0 +1,15 @@ +{application, emqx_data_bridge, + [{description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {emqx_data_bridge_app, []}}, + {applications, + [kernel, + stdlib + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache 2.0"]}, + {links, []} + ]}. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge.erl b/apps/emqx_data_bridge/src/emqx_data_bridge.erl new file mode 100644 index 000000000..cd26b9d3e --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge.erl @@ -0,0 +1,9 @@ +-module(emqx_data_bridge). + +-export([ load_bridges/0 + ]). + +load_bridges() -> + Bridges = proplists:get_value(bridges, + application:get_all_env(emqx_data_bridge), []), + emqx_data_bridge_monitor:ensure_all_started(Bridges). diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl new file mode 100644 index 000000000..4ff96e34e --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl @@ -0,0 +1,32 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_data_bridge_app). + +-behaviour(application). + +-emqx_plugin(?MODULE). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + {ok, Sup} = emqx_data_bridge_sup:start_link(), + ok = emqx_data_bridge:load_bridges(), + {ok, Sup}. + +stop(_State) -> + ok. + +%% internal functions diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl new file mode 100644 index 000000000..22c216038 --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl @@ -0,0 +1,64 @@ +%% This process monitors all the data bridges, and try to restart a bridge +%% when one of it stopped. +-module(emqx_data_bridge_monitor). + +-behaviour(gen_server). + +%% API functions +-export([ start_link/0 + , ensure_all_started/1 + ]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +ensure_all_started(Configs) -> + gen_server:cast(?MODULE, {start_and_monitor, Configs}). + +init([]) -> + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast({start_and_monitor, Configs}, State) -> + ok = load_bridges(Configs), + {noreply, State}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%============================================================================ +load_bridges(Configs) -> + lists:foreach(fun load_bridge/1, Configs). + +load_bridge(#{<<"name">> := Name, <<"type">> := Type, + <<"config">> := Config}) -> + case emqx_resource:check_and_load_instance(Name, resource_type(Type), Config) of + {ok, _} -> ok; + {error, already_created} -> ok; + {error, Reason} -> + error({load_bridge, Reason}) + end. + +resource_type(<<"mysql">>) -> emqx_connector_mysql. \ No newline at end of file diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl new file mode 100644 index 000000000..516d9b5ab --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl @@ -0,0 +1,41 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_data_bridge_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +init([]) -> + SupFlags = #{strategy => one_for_one, + intensity => 10, + period => 10}, + ChildSpecs = [ + #{id => emqx_data_bridge_monitor, + start => {emqx_data_bridge_monitor, start_link, []}, + restart => permanent, + type => worker, + modules => [emqx_data_bridge_monitor]}], + {ok, {SupFlags, ChildSpecs}}. + +%% internal functions diff --git a/apps/emqx_resource/examples/log_tracer.erl b/apps/emqx_resource/examples/log_tracer.erl index 99fc5bd3b..f4b531449 100644 --- a/apps/emqx_resource/examples/log_tracer.erl +++ b/apps/emqx_resource/examples/log_tracer.erl @@ -14,10 +14,10 @@ ]). %% callbacks for emqx_resource config schema --export([fields/1]). +-export([schema/0]). -fields(ConfPath) -> - log_tracer_schema:fields(ConfPath). +schema() -> + log_tracer_schema:schema(). on_start(InstId, Config) -> io:format("== the demo log tracer ~p started.~nconfig: ~p~n", [InstId, Config]), diff --git a/apps/emqx_resource/examples/log_tracer_schema.erl b/apps/emqx_resource/examples/log_tracer_schema.erl index 2b49aab7f..a8fc55411 100644 --- a/apps/emqx_resource/examples/log_tracer_schema.erl +++ b/apps/emqx_resource/examples/log_tracer_schema.erl @@ -2,7 +2,7 @@ -include_lib("typerefl/include/types.hrl"). --export([fields/1]). +-export([schema/0]). -reflect_type([t_level/0, t_cache_logs_in/0]). @@ -10,15 +10,14 @@ -type t_cache_logs_in() :: memory | file. -fields("config") -> +schema() -> [ {condition, fun condition/1} , {level, fun level/1} , {enable_cache, fun enable_cache/1} , {cache_logs_in, fun cache_logs_in/1} , {cache_log_dir, fun cache_log_dir/1} , {bulk, fun bulk/1} - ]; -fields(_) -> []. + ]. condition(mapping) -> "config.condition"; condition(type) -> map(); diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index c432e683e..b909b0e50 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -37,7 +37,8 @@ %% APIs for instances --export([ parse_config/2 +-export([ check_config/2 + , check_and_load_instance/3 , resource_type_from_str/1 ]). @@ -73,9 +74,6 @@ , list_instances_verbose/0 %% list all the instances , get_instance/1 %% return the data of the instance , get_instance_by_type/1 %% return all the instances of the same resource type - , load_instances_from_dir/1 %% load instances from a directory - , load_instance_from_file/1 %% load an instance from a config file - , load_instance_from_config/1 %% load an instance from a map or json-string config % , dependents/1 % , inc_counter/2 %% increment the counter of the instance % , inc_counter/3 %% increment the counter by a given integer @@ -215,18 +213,6 @@ list_instances_verbose() -> get_instance_by_type(ResourceType) -> emqx_resource_instance:lookup_by_type(ResourceType). --spec load_instances_from_dir(Dir :: string()) -> ok. -load_instances_from_dir(Dir) -> - emqx_resource_instance:load_dir(Dir). - --spec load_instance_from_file(File :: string()) -> ok. -load_instance_from_file(File) -> - emqx_resource_instance:load_file(File). - --spec load_instance_from_config(binary() | map()) -> ok. -load_instance_from_config(Config) -> - emqx_resource_instance:load_config(Config). - -spec call_start(instance_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(InstId, Mod, Config) -> @@ -267,24 +253,30 @@ call_api_reply_format(Mod, Data) -> true -> ?SAFE_CALL(Mod:on_api_reply_format(Data)) end. --spec parse_config(resource_type(), binary() | term()) -> +-spec check_config(resource_type(), binary() | term()) -> {ok, resource_config()} | {error, term()}. -parse_config(ResourceType, RawConfig) when is_binary(RawConfig) -> +check_config(ResourceType, RawConfig) when is_binary(RawConfig) -> case hocon:binary(RawConfig, #{format => richmap}) of {ok, MapConfig} -> - do_parse_config(ResourceType, MapConfig); + do_check_config(ResourceType, MapConfig); Error -> Error end; -parse_config(ResourceType, RawConfigTerm) -> - parse_config(ResourceType, jsx:encode(#{config => RawConfigTerm})). +check_config(ResourceType, RawConfigTerm) -> + check_config(ResourceType, jsx:encode(#{config => RawConfigTerm})). --spec do_parse_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}. -do_parse_config(ResourceType, MapConfig) -> - case ?SAFE_CALL(hocon_schema:generate(ResourceType, MapConfig)) of +-spec do_check_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}. +do_check_config(ResourceType, MapConfig) -> + case ?SAFE_CALL(hocon_schema:check(ResourceType, MapConfig)) of {error, Reason} -> {error, Reason}; - Config -> - InstConf = maps:from_list(proplists:get_value(config, Config)), - {ok, InstConf} + Config -> {ok, maps:get(<<"config">>, hocon:richmap_to_map(Config))} + end. + +-spec check_and_load_instance(instance_id(), resource_type(), binary() | term()) -> + {ok, resource_data()} | {error, term()}. +check_and_load_instance(InstId, ResourceType, Config) -> + case check_config(ResourceType, Config) of + {ok, InstConf} -> emqx_resource_instance:create_local(InstId, ResourceType, InstConf); + Error -> Error end. %% ================================================================================= diff --git a/apps/emqx_resource/src/emqx_resource_api.erl b/apps/emqx_resource/src/emqx_resource_api.erl index 1e19cdede..a92d41b75 100644 --- a/apps/emqx_resource/src/emqx_resource_api.erl +++ b/apps/emqx_resource/src/emqx_resource_api.erl @@ -46,7 +46,7 @@ put(Mod, #{id := Id}, Params) -> end. do_put(Mod, Id, ConfigParams, ResourceType, Params) -> - case emqx_resource:parse_config(ResourceType, ConfigParams) of + case emqx_resource:check_config(ResourceType, ConfigParams) of {ok, Config} -> case emqx_resource:update(Id, ResourceType, Config, Params) of {ok, Data} -> diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index dbbf052a1..470263a6d 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -23,10 +23,7 @@ -export([start_link/2]). %% load resource instances from *.conf files --export([ load_dir/1 - , load_file/1 - , load_config/1 - , lookup/1 +-export([ lookup/1 , list_all/0 , lookup_by_type/1 , create_local/3 @@ -85,44 +82,8 @@ lookup_by_type(ResourceType) -> [Data || #{mod := Mod} = Data <- list_all() , Mod =:= ResourceType]. --spec load_dir(Dir :: string()) -> ok. -load_dir(Dir) -> - lists:foreach(fun load_file/1, filelib:wildcard(filename:join([Dir, "*.conf"]))). - -load_file(File) -> - case ?SAFE_CALL(hocon_token:read(File)) of - {error, Reason} -> - logger:error("load resource from ~p failed: ~p", [File, Reason]); - RawConfig -> - case load_config(RawConfig) of - {ok, Data} -> - logger:debug("loaded resource instance from file: ~p, data: ~p", - [File, Data]); - {error, Reason} -> - logger:error("load resource from ~p failed: ~p", [File, Reason]) - end - end. - --spec load_config(binary() | map()) -> {ok, resource_data()} | {error, term()}. -load_config(RawConfig) when is_binary(RawConfig) -> - case hocon:binary(RawConfig, #{format => map}) of - {ok, ConfigTerm} -> load_config(ConfigTerm); - Error -> Error - end; - -load_config(#{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr} = Config) -> - MapConfig = maps:get(<<"config">>, Config, #{}), - case emqx_resource:resource_type_from_str(ResourceTypeStr) of - {ok, ResourceType} -> parse_and_load_config(Id, ResourceType, MapConfig); - Error -> Error - end. - -parse_and_load_config(InstId, ResourceType, MapConfig) -> - case emqx_resource:parse_config(ResourceType, MapConfig) of - {ok, InstConf} -> create_local(InstId, ResourceType, InstConf); - Error -> Error - end. - +-spec create_local(instance_id(), resource_type(), resource_config()) -> + {ok, resource_data()} | {error, term()}. create_local(InstId, ResourceType, InstConf) -> case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of {ok, Data} -> {ok, Data}; diff --git a/apps/emqx_resource/src/emqx_resource_transform.erl b/apps/emqx_resource/src/emqx_resource_transform.erl index cd6c7e4ae..6207d675e 100644 --- a/apps/emqx_resource/src/emqx_resource_transform.erl +++ b/apps/emqx_resource/src/emqx_resource_transform.erl @@ -68,12 +68,18 @@ form(Mod, Form) -> fix_spec_attrs() -> [ ?Q("-export([emqx_resource_schema/0]).") - , ?Q("-export([structs/0]).") + , ?Q("-export([structs/0, fields/1]).") , ?Q("-behaviour(hocon_schema).") ]. fix_spec_funcs(_Mod) -> - [ (?Q("emqx_resource_schema() -> <<\"demo_swagger_schema\">>.")) + [ ?Q("emqx_resource_schema() -> <<\"demo_swagger_schema\">>.") , ?Q("structs() -> [\"config\"].") + , ?Q("fields(\"config\") -> " + "[fun (type) -> \"schema\"; " + " (_) -> undefined " + " end];" + "fields(\"schema\") -> schema()." + ) ]. fix_api_attrs(Mod, Path) -> diff --git a/data/loaded_plugins.tmpl b/data/loaded_plugins.tmpl index eb4e58d1f..5ac46e0e3 100644 --- a/data/loaded_plugins.tmpl +++ b/data/loaded_plugins.tmpl @@ -7,4 +7,5 @@ {emqx_rule_engine, {{enable_plugin_emqx_rule_engine}}}. {emqx_resource, {{enable_plugin_emqx_resource}}}. {emqx_connector, {{enable_plugin_emqx_connector}}}. +{emqx_data_bridge, {{enable_plugin_emqx_data_bridge}}}. {emqx_bridge_mqtt, {{enable_plugin_emqx_bridge_mqtt}}}. diff --git a/rebar.config.erl b/rebar.config.erl index fa5586ad5..c5206e9e2 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -182,6 +182,7 @@ overlay_vars_rel(RelType) -> , {enable_plugin_emqx_bridge_mqtt, RelType =:= edge} , {enable_plugin_emqx_resource, true} , {enable_plugin_emqx_connector, true} + , {enable_plugin_emqx_data_bridge, true} , {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce , {enable_plugin_emqx_recon, true} , {enable_plugin_emqx_retainer, true} @@ -277,6 +278,7 @@ relx_plugin_apps(ReleaseType) -> , emqx_recon , emqx_resource , emqx_connector + , emqx_data_bridge , emqx_rule_engine , emqx_sasl ]