diff --git a/apps/emqx_connector/.gitignore b/apps/emqx_connector/.gitignore new file mode 100644 index 000000000..f1c455451 --- /dev/null +++ b/apps/emqx_connector/.gitignore @@ -0,0 +1,19 @@ +.rebar3 +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +_build +.idea +*.iml +rebar3.crashdump +*~ diff --git a/apps/emqx_connector/README.md b/apps/emqx_connector/README.md new file mode 100644 index 000000000..879669f93 --- /dev/null +++ b/apps/emqx_connector/README.md @@ -0,0 +1,27 @@ +# emqx_connector + +This application is a collection of `connectors`. + +A `connector` is a callback module of `emqx_resource` that maintains the data related to +external resources. Put all resource related callback modules in a single application is good as +we can put some util functions/modules here for reusing purpose. + +For example, a mysql connector is an emqx resource that maintains all the mysql connection +related parameters (configs) and the TCP connections to the mysql server. + +An mysql connector can be used as following: + +``` +(emqx@127.0.0.1)5> emqx_resource:list_instances_verbose(). +[#{config => + #{auto_reconnect => true,cacertfile => [],certfile => [], + database => "mqtt",keyfile => [],password => "public", + pool_size => 1, + server => {{127,0,0,1},3306}, + ssl => false,user => "root",verify => false}, + id => <<"mysql-abc">>,mod => emqx_connector_mysql, + state => #{poolname => 'mysql-abc'}, + status => started}] +(emqx@127.0.0.1)6> emqx_resource:query(<<"mysql-abc">>, {sql, <<"SELECT count(1)">>}). +{ok,[<<"count(1)">>],[[1]]} +``` diff --git a/apps/emqx_connector/etc/emqx_connector.conf b/apps/emqx_connector/etc/emqx_connector.conf new file mode 100644 index 000000000..db4402d47 --- /dev/null +++ b/apps/emqx_connector/etc/emqx_connector.conf @@ -0,0 +1,4 @@ +##-------------------------------------------------------------------- +## EMQ X CONNECTOR Plugin +##-------------------------------------------------------------------- + diff --git a/apps/emqx_connector/priv/emqx_connector.schema b/apps/emqx_connector/priv/emqx_connector.schema new file mode 100644 index 000000000..b8476c4d9 --- /dev/null +++ b/apps/emqx_connector/priv/emqx_connector.schema @@ -0,0 +1,2 @@ +%%-*- mode: erlang -*- +%% emqx_connector config mapping diff --git a/apps/emqx_connector/rebar.config b/apps/emqx_connector/rebar.config new file mode 100644 index 000000000..53b5c63e8 --- /dev/null +++ b/apps/emqx_connector/rebar.config @@ -0,0 +1,13 @@ +{erl_opts, [ + nowarn_unused_import, + debug_info +]}. + +{deps, [ + {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}} +]}. + +{shell, [ + % {config, "config/sys.config"}, + {apps, [emqx_connector]} +]}. diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src new file mode 100644 index 000000000..a821b8f13 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -0,0 +1,17 @@ +{application, emqx_connector, + [{description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {emqx_connector_app, []}}, + {applications, + [kernel, + stdlib, + emqx_resource, + ecpool + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache 2.0"]}, + {links, []} + ]}. diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl new file mode 100644 index 000000000..dd0359348 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -0,0 +1,16 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector). diff --git a/apps/emqx_connector/src/emqx_connector_app.erl b/apps/emqx_connector/src/emqx_connector_app.erl new file mode 100644 index 000000000..4bbad75cf --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_app.erl @@ -0,0 +1,31 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_app). + +-behaviour(application). + +-emqx_plugin(?MODULE). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + emqx_connector_sup:start_link(). + +stop(_State) -> + ok. + +%% internal functions diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl new file mode 100644 index 000000000..423a6edf1 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -0,0 +1,106 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector_mysql). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). + +-export([ on_jsonify/1 + ]). + +%% callbacks of behaviour emqx_resource +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_health_check/2 + ]). + +-export([connect/1]). + +-export([do_health_check/1]). + +%%===================================================================== +schema() -> + emqx_connector_schema_lib:relational_db_fields() ++ + emqx_connector_schema_lib:ssl_fields(). + +on_jsonify(#{<<"server">> := Server, <<"user">> := User, <<"database">> := DB, + <<"password">> := Passwd, <<"cacertfile">> := CAFile, + <<"keyfile">> := KeyFile, <<"certfile">> := CertFile} = Config) -> + Config#{ + <<"user">> => list_to_binary(User), + <<"database">> => list_to_binary(DB), + <<"password">> => list_to_binary(Passwd), + <<"server">> => emqx_connector_schema_lib:ip_port_to_string(Server), + <<"cacertfile">> => list_to_binary(CAFile), + <<"keyfile">> => list_to_binary(KeyFile), + <<"certfile">> => list_to_binary(CertFile) + }. + +%% =================================================================== + +on_start(InstId, #{<<"server">> := {Host, Port}, + <<"database">> := DB, + <<"user">> := User, + <<"password">> := Password, + <<"auto_reconnect">> := AutoReconn, + <<"pool_size">> := PoolSize} = Config) -> + logger:info("starting mysql connector: ~p, config: ~p", [InstId, Config]), + SslOpts = case maps:get(<<"ssl">>, Config) of + true -> + [{ssl, [{server_name_indication, disable} | + emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)]}]; + false -> + [] + end, + Options = [{host, Host}, + {port, Port}, + {user, User}, + {password, Password}, + {database, DB}, + {auto_reconnect, reconn_interval(AutoReconn)}, + {pool_size, PoolSize}], + PoolName = emqx_plugin_libs_pool:pool_name(InstId), + _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts), + {ok, #{poolname => PoolName}}. + +on_stop(InstId, #{poolname := PoolName}) -> + logger:info("stopping mysql connector: ~p", [InstId]), + emqx_plugin_libs_pool:stop_pool(PoolName). + +on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> + logger:debug("mysql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), + case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL]}, no_handover) of + {error, Reason} -> + logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), + emqx_resource:query_failed(AfterQuery); + _ -> + emqx_resource:query_success(AfterQuery) + end, + Result. + +on_health_check(_InstId, #{poolname := PoolName} = State) -> + emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). + +do_health_check(Conn) -> + ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)). + +%% =================================================================== +reconn_interval(true) -> 15; +reconn_interval(false) -> false. + +connect(Options) -> + mysql:start_link(Options). diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl new file mode 100644 index 000000000..02bed956b --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -0,0 +1,111 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector_schema_lib). +-include_lib("typerefl/include/types.hrl"). + +-export([ relational_db_fields/0 + , ssl_fields/0 + ]). + +-export([ to_ip_port/1 + , ip_port_to_string/1 + ]). + +-typerefl_from_string({ip_port/0, emqx_connector_schema_lib, to_ip_port}). + +-reflect_type([ip_port/0]). + +-type ip_port() :: tuple(). + +-define(VALID, emqx_resource_validator). +-define(REQUIRED(MSG), ?VALID:required(MSG)). +-define(MAX(MAXV), ?VALID:max(number, MAXV)). +-define(MIN(MINV), ?VALID:min(number, MINV)). + +relational_db_fields() -> + [ {server, fun server/1} + , {database, fun database/1} + , {pool_size, fun pool_size/1} + , {user, fun user/1} + , {password, fun password/1} + , {auto_reconnect, fun auto_reconnect/1} + ]. + +ssl_fields() -> + [ {ssl, fun ssl/1} + , {cacertfile, fun cacertfile/1} + , {keyfile, fun keyfile/1} + , {certfile, fun certfile/1} + , {verify, fun verify/1} + ]. + +server(type) -> ip_port(); +server(validator) -> [?REQUIRED("the field 'server' is required")]; +server(_) -> undefined. + +database(type) -> string(); +database(validator) -> [?REQUIRED("the field 'server' is required")]; +database(_) -> undefined. + +pool_size(type) -> integer(); +pool_size(default) -> 8; +pool_size(validator) -> [?MIN(1), ?MAX(64)]; +pool_size(_) -> undefined. + +user(type) -> string(); +user(default) -> "root"; +user(_) -> undefined. + +password(type) -> string(); +password(default) -> ""; +password(_) -> undefined. + +auto_reconnect(type) -> boolean(); +auto_reconnect(default) -> true; +auto_reconnect(_) -> undefined. + +ssl(type) -> boolean(); +ssl(default) -> false; +ssl(_) -> undefined. + +cacertfile(type) -> string(); +cacertfile(default) -> ""; +cacertfile(_) -> undefined. + +keyfile(type) -> string(); +keyfile(default) -> ""; +keyfile(_) -> undefined. + +certfile(type) -> string(); +certfile(default) -> ""; +certfile(_) -> undefined. + +verify(type) -> boolean(); +verify(default) -> false; +verify(_) -> undefined. + +to_ip_port(Str) -> + case string:tokens(Str, ":") of + [Ip, Port] -> + case inet:parse_address(Ip) of + {ok, R} -> {ok, {R, list_to_integer(Port)}}; + _ -> {error, Str} + end; + _ -> {error, Str} + end. + +ip_port_to_string({Ip, Port}) -> + iolist_to_binary([inet:ntoa(Ip), ":", integer_to_list(Port)]). diff --git a/apps/emqx_connector/src/emqx_connector_sup.erl b/apps/emqx_connector/src/emqx_connector_sup.erl new file mode 100644 index 000000000..603b9a8ad --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_sup.erl @@ -0,0 +1,36 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +init([]) -> + SupFlags = #{strategy => one_for_all, + intensity => 0, + period => 1}, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%% internal functions diff --git a/apps/emqx_data_bridge/.gitignore b/apps/emqx_data_bridge/.gitignore new file mode 100644 index 000000000..f1c455451 --- /dev/null +++ b/apps/emqx_data_bridge/.gitignore @@ -0,0 +1,19 @@ +.rebar3 +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +_build +.idea +*.iml +rebar3.crashdump +*~ diff --git a/apps/emqx_data_bridge/README.md b/apps/emqx_data_bridge/README.md new file mode 100644 index 000000000..8f76f17a5 --- /dev/null +++ b/apps/emqx_data_bridge/README.md @@ -0,0 +1,10 @@ +# emqx_data_bridge + +EMQ X Data Bridge is an application that managing the resources (see emqx_resource) used by emqx +rule engine. + +It provides CRUD HTTP APIs of the resources, and is also responsible for loading the resources at +startup, and saving configs of resources to `data/` after configs updated. + +The application depends on `emqx_connector` as that's where all the callback modules of `connector` +resources placed. diff --git a/apps/emqx_data_bridge/etc/emqx_data_bridge.conf b/apps/emqx_data_bridge/etc/emqx_data_bridge.conf new file mode 100644 index 000000000..295173e6c --- /dev/null +++ b/apps/emqx_data_bridge/etc/emqx_data_bridge.conf @@ -0,0 +1,30 @@ +##-------------------------------------------------------------------- +## EMQ X Bridge Plugin +##-------------------------------------------------------------------- + +emqx_data_bridge.bridges: [ + {name: "mysql-abc" + type: mysql + config: { + server: "127.0.0.1:3306" + database: mqtt + pool_size: 1 + user: root + password: public + auto_reconnect: true + ssl: false + } + }, + {name: "mysql-def" + type: mysql + config: { + server: "127.0.0.1:3306" + database: mqtt + pool_size: 1 + user: root + password: public + auto_reconnect: true + ssl: false + } + } +] diff --git a/apps/emqx_data_bridge/priv/emqx_data_bridge.schema b/apps/emqx_data_bridge/priv/emqx_data_bridge.schema new file mode 100644 index 000000000..c9cb3f2c0 --- /dev/null +++ b/apps/emqx_data_bridge/priv/emqx_data_bridge.schema @@ -0,0 +1,16 @@ +%%-*- mode: erlang -*- +%% emqx_data_bridge config mapping + +{mapping, "emqx_data_bridge.bridges", "emqx_data_bridge.bridges", [ + {default, []}, + {datatype, string} +]}. + +% fields("emqx_data_bridge") -> +% [ +% {bridges, +% [fun(mapping) -> "emqx_data_bridge.bridges"; +% (type) -> list(); +% (_) -> undefined +% end]} +% ] \ No newline at end of file diff --git a/apps/emqx_data_bridge/rebar.config b/apps/emqx_data_bridge/rebar.config new file mode 100644 index 000000000..cf4cfcf1b --- /dev/null +++ b/apps/emqx_data_bridge/rebar.config @@ -0,0 +1,7 @@ +{erl_opts, [debug_info]}. +{deps, []}. + +{shell, [ + % {config, "config/sys.config"}, + {apps, [emqx_data_bridge]} +]}. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge.app.src b/apps/emqx_data_bridge/src/emqx_data_bridge.app.src new file mode 100644 index 000000000..360511d9b --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge.app.src @@ -0,0 +1,15 @@ +{application, emqx_data_bridge, + [{description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {emqx_data_bridge_app, []}}, + {applications, + [kernel, + stdlib + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache 2.0"]}, + {links, []} + ]}. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge.erl b/apps/emqx_data_bridge/src/emqx_data_bridge.erl new file mode 100644 index 000000000..2adc48111 --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge.erl @@ -0,0 +1,48 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_data_bridge). + +-export([ load_bridges/0 + , resource_type/1 + , bridge_type/1 + , name_to_resource_id/1 + , resource_id_to_name/1 + , list_bridges/0 + , is_bridge/1 + ]). + +load_bridges() -> + Bridges = proplists:get_value(bridges, + application:get_all_env(emqx_data_bridge), []), + emqx_data_bridge_monitor:ensure_all_started(Bridges). + +resource_type(<<"mysql">>) -> emqx_connector_mysql. + +bridge_type(emqx_connector_mysql) -> <<"mysql">>. + +name_to_resource_id(BridgeName) -> + <<"bridge:", BridgeName/binary>>. + +resource_id_to_name(<<"bridge:", BridgeName/binary>> = _ResourceId) -> + BridgeName. + +list_bridges() -> + emqx_resource_api:list_instances(fun emqx_data_bridge:is_bridge/1). + +is_bridge(#{id := <<"bridge:", _/binary>>}) -> + true; +is_bridge(_Data) -> + false. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl new file mode 100644 index 000000000..4de432ca8 --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl @@ -0,0 +1,114 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_data_bridge_api). + +-rest_api(#{ name => list_data_bridges + , method => 'GET' + , path => "/data_bridges" + , func => list_bridges + , descr => "List all data bridges" + }). + +-rest_api(#{ name => get_data_bridge + , method => 'GET' + , path => "/data_bridges/:bin:name" + , func => get_bridge + , descr => "Get a data bridge by name" + }). + +-rest_api(#{ name => create_data_bridge + , method => 'POST' + , path => "/data_bridges/:bin:name" + , func => create_bridge + , descr => "Create a new data bridge" + }). + +-rest_api(#{ name => update_data_bridge + , method => 'POST' + , path => "/data_bridges/:bin:name" + , func => update_bridge + , descr => "Update an existing data bridge" + }). + +-rest_api(#{ name => delete_data_bridge + , method => 'DELETE' + , path => "/data_bridges/:bin:name" + , func => delete_bridge + , descr => "Delete an existing data bridge" + }). + +-export([ list_bridges/2 + , get_bridge/2 + , create_bridge/2 + , update_bridge/2 + , delete_bridge/2 + ]). + +list_bridges(_Binding, _Params) -> + {200, #{code => 0, data => [format_api_reply(Data) || + Data <- emqx_data_bridge:list_bridges()]}}. + +get_bridge(#{name := Name}, _Params) -> + case emqx_resource:get_instance(emqx_data_bridge:name_to_resource_id(Name)) of + {ok, Data} -> + {200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}}; + {error, not_found} -> + {404, #{code => 102, message => <<"not_found: ", Name/binary>>}} + end. + +create_bridge(#{name := Name}, Params) -> + Config = proplists:get_value(<<"config">>, Params), + BridgeType = proplists:get_value(<<"type">>, Params), + case emqx_resource:check_and_create( + emqx_data_bridge:name_to_resource_id(Name), + emqx_data_bridge:resource_type(BridgeType), Config) of + {ok, Data} -> + {200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}}; + {error, already_created} -> + {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; + {error, Reason0} -> + Reason = emqx_resource_api:stringnify(Reason0), + {500, #{code => 102, message => <<"create bridge ", Name/binary, + " failed:", Reason/binary>>}} + end. + +update_bridge(#{name := Name}, Params) -> + Config = proplists:get_value(<<"config">>, Params), + BridgeType = proplists:get_value(<<"type">>, Params), + case emqx_resource:check_and_update( + emqx_data_bridge:name_to_resource_id(Name), + emqx_data_bridge:resource_type(BridgeType), Config, []) of + {ok, Data} -> + {200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}}; + {error, not_found} -> + {400, #{code => 102, message => <<"bridge not_found: ", Name/binary>>}}; + {error, Reason0} -> + Reason = emqx_resource_api:stringnify(Reason0), + {500, #{code => 102, message => <<"update bridge ", Name/binary, + " failed:", Reason/binary>>}} + end. + +delete_bridge(#{name := Name}, _Params) -> + case emqx_resource:remove(emqx_data_bridge:name_to_resource_id(Name)) of + ok -> {200, #{code => 0, data => #{}}}; + {error, Reason} -> + {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} + end. + +format_api_reply(#{resource_type := Type, id := Id, config := Conf, status := Status}) -> + #{type => emqx_data_bridge:bridge_type(Type), + name => emqx_data_bridge:resource_id_to_name(Id), + config => Conf, status => Status}. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl new file mode 100644 index 000000000..4ff96e34e --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl @@ -0,0 +1,32 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_data_bridge_app). + +-behaviour(application). + +-emqx_plugin(?MODULE). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + {ok, Sup} = emqx_data_bridge_sup:start_link(), + ok = emqx_data_bridge:load_bridges(), + {ok, Sup}. + +stop(_State) -> + ok. + +%% internal functions diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_config_handler.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_config_handler.erl new file mode 100644 index 000000000..387f2b153 --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_config_handler.erl @@ -0,0 +1,69 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_data_bridge_config_handler). + +-behaviour(gen_server). + +%% API functions +-export([ start_link/0 + , notify_updated/0 + ]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +notify_updated() -> + gen_server:cast(?MODULE, updated). + +init([]) -> + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(updated, State) -> + Configs = [format_conf(Data) || Data <- emqx_data_bridge:list_bridges()], + emqx_config_handler ! {emqx_data_bridge, Configs}, + {noreply, State}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%============================================================================ + +format_conf(#{resource_type := Type, id := Id, config := Conf}) -> + #{type => Type, name => emqx_data_bridge:resource_id_to_name(Id), + config => Conf}. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl new file mode 100644 index 000000000..f3b6e6736 --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl @@ -0,0 +1,79 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +%% This process monitors all the data bridges, and try to restart a bridge +%% when one of it stopped. +-module(emqx_data_bridge_monitor). + +-behaviour(gen_server). + +%% API functions +-export([ start_link/0 + , ensure_all_started/1 + ]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +ensure_all_started(Configs) -> + gen_server:cast(?MODULE, {start_and_monitor, Configs}). + +init([]) -> + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast({start_and_monitor, Configs}, State) -> + ok = load_bridges(Configs), + {noreply, State}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%============================================================================ +load_bridges(Configs) -> + lists:foreach(fun load_bridge/1, Configs). + +load_bridge(#{<<"name">> := Name, <<"type">> := Type, + <<"config">> := Config}) -> + case emqx_resource:check_and_create_local( + emqx_data_bridge:name_to_resource_id(Name), + emqx_data_bridge:resource_type(Type), Config) of + {ok, _} -> ok; + {error, already_created} -> ok; + {error, Reason} -> + error({load_bridge, Reason}) + end. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl new file mode 100644 index 000000000..516d9b5ab --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl @@ -0,0 +1,41 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_data_bridge_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +init([]) -> + SupFlags = #{strategy => one_for_one, + intensity => 10, + period => 10}, + ChildSpecs = [ + #{id => emqx_data_bridge_monitor, + start => {emqx_data_bridge_monitor, start_link, []}, + restart => permanent, + type => worker, + modules => [emqx_data_bridge_monitor]}], + {ok, {SupFlags, ChildSpecs}}. + +%% internal functions diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl new file mode 100644 index 000000000..dddc44f4e --- /dev/null +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl @@ -0,0 +1,58 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugin_libs_pool). + +-export([ start_pool/3 + , stop_pool/1 + , pool_name/1 + , health_check/3 + ]). + +pool_name(ID) when is_binary(ID) -> + list_to_atom(binary_to_list(ID)). + +start_pool(Name, Mod, Options) -> + case ecpool:start_sup_pool(Name, Mod, Options) of + {ok, _} -> logger:log(info, "Initiated ~0p Successfully", [Name]); + {error, {already_started, _Pid}} -> + stop_pool(Name), + start_pool(Name, Mod, Options); + {error, Reason} -> + logger:log(error, "Initiate ~0p failed ~0p", [Name, Reason]), + error({start_pool_failed, Name}) + end. + +stop_pool(Name) -> + case ecpool:stop_sup_pool(Name) of + ok -> logger:log(info, "Destroyed ~0p Successfully", [Name]); + {error, not_found} -> ok; + {error, Reason} -> + logger:log(error, "Destroy ~0p failed, ~0p", [Name, Reason]), + error({stop_pool_failed, Name}) + end. + +health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) -> + Status = [begin + case ecpool_worker:client(Worker) of + {ok, Conn} -> CheckFunc(Conn); + _ -> false + end + end || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of + true -> {ok, State}; + false -> {error, test_query_failed} + end. diff --git a/apps/emqx_resource/examples/log_tracer.erl b/apps/emqx_resource/examples/log_tracer.erl index 99fc5bd3b..f4b531449 100644 --- a/apps/emqx_resource/examples/log_tracer.erl +++ b/apps/emqx_resource/examples/log_tracer.erl @@ -14,10 +14,10 @@ ]). %% callbacks for emqx_resource config schema --export([fields/1]). +-export([schema/0]). -fields(ConfPath) -> - log_tracer_schema:fields(ConfPath). +schema() -> + log_tracer_schema:schema(). on_start(InstId, Config) -> io:format("== the demo log tracer ~p started.~nconfig: ~p~n", [InstId, Config]), diff --git a/apps/emqx_resource/examples/log_tracer_schema.erl b/apps/emqx_resource/examples/log_tracer_schema.erl index 2b49aab7f..a8fc55411 100644 --- a/apps/emqx_resource/examples/log_tracer_schema.erl +++ b/apps/emqx_resource/examples/log_tracer_schema.erl @@ -2,7 +2,7 @@ -include_lib("typerefl/include/types.hrl"). --export([fields/1]). +-export([schema/0]). -reflect_type([t_level/0, t_cache_logs_in/0]). @@ -10,15 +10,14 @@ -type t_cache_logs_in() :: memory | file. -fields("config") -> +schema() -> [ {condition, fun condition/1} , {level, fun level/1} , {enable_cache, fun enable_cache/1} , {cache_logs_in, fun cache_logs_in/1} , {cache_log_dir, fun cache_log_dir/1} , {bulk, fun bulk/1} - ]; -fields(_) -> []. + ]. condition(mapping) -> "config.condition"; condition(type) -> map(); diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 7470b823d..b2bbc586f 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -37,7 +37,11 @@ %% APIs for instances --export([ parse_config/2 +-export([ check_config/2 + , check_and_create/3 + , check_and_create_local/3 + , check_and_update/4 + , check_and_update_local/4 , resource_type_from_str/1 ]). @@ -45,10 +49,13 @@ %% provisional solution: rpc:multical to all the nodes for creating/updating/removing %% todo: replicate operations -export([ create/3 %% store the config and start the instance + , create_local/3 , create_dry_run/3 %% run start/2, health_check/2 and stop/1 sequentially + , create_dry_run_local/3 , update/4 %% update the config, stop the old instance and start the new one - %% it will create a new resource when the id does not exist + , update_local/4 , remove/1 %% remove the config and stop the instance + , remove_local/1 ]). %% Calls to the callback module with current resource state @@ -66,16 +73,12 @@ , call_stop/3 %% stop the instance , call_config_merge/4 %% merge the config when updating , call_jsonify/2 - , call_api_reply_format/2 ]). -export([ list_instances/0 %% list all the instances, id only. , list_instances_verbose/0 %% list all the instances , get_instance/1 %% return the data of the instance - , get_instance_by_type/1 %% return all the instances of the same resource type - , load_instances_from_dir/1 %% load instances from a directory - , load_instance_from_file/1 %% load an instance from a config file - , load_instance_from_config/1 %% load an instance from a map or json-string config + , list_instances_by_type/1 %% return all the instances of the same resource type % , dependents/1 % , inc_counter/2 %% increment the counter of the instance % , inc_counter/3 %% increment the counter by a given integer @@ -154,22 +157,42 @@ query_failed({_, {OnFailed, Args}}) -> -spec create(instance_id(), resource_type(), resource_config()) -> {ok, resource_data()} | {error, Reason :: term()}. create(InstId, ResourceType, Config) -> - ?CLUSTER_CALL(call_instance, [InstId, {create, InstId, ResourceType, Config}], {ok, _}). + ?CLUSTER_CALL(create_local, [InstId, ResourceType, Config], {ok, _}). + +-spec create_local(instance_id(), resource_type(), resource_config()) -> + {ok, resource_data()} | {error, Reason :: term()}. +create_local(InstId, ResourceType, Config) -> + call_instance(InstId, {create, InstId, ResourceType, Config}). -spec create_dry_run(instance_id(), resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run(InstId, ResourceType, Config) -> - ?CLUSTER_CALL(call_instance, [InstId, {create_dry_run, InstId, ResourceType, Config}]). + ?CLUSTER_CALL(create_dry_run_local, [InstId, ResourceType, Config]). + +-spec create_dry_run_local(instance_id(), resource_type(), resource_config()) -> + ok | {error, Reason :: term()}. +create_dry_run_local(InstId, ResourceType, Config) -> + call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}). -spec update(instance_id(), resource_type(), resource_config(), term()) -> {ok, resource_data()} | {error, Reason :: term()}. update(InstId, ResourceType, Config, Params) -> - ?CLUSTER_CALL(call_instance, [InstId, {update, InstId, ResourceType, Config, Params}], {ok, _}). + ?CLUSTER_CALL(update_local, [InstId, ResourceType, Config, Params], {ok, _}). + +-spec update_local(instance_id(), resource_type(), resource_config(), term()) -> + {ok, resource_data()} | {error, Reason :: term()}. +update_local(InstId, ResourceType, Config, Params) -> + call_instance(InstId, {update, InstId, ResourceType, Config, Params}). -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> - ?CLUSTER_CALL(call_instance, [InstId, {remove, InstId}]). + ?CLUSTER_CALL(remove_local, [InstId]). +-spec remove_local(instance_id()) -> ok | {error, Reason :: term()}. +remove_local(InstId) -> + call_instance(InstId, {remove, InstId}). + +%% ================================================================================= -spec query(instance_id(), Request :: term()) -> Result :: term(). query(InstId, Request) -> query(InstId, Request, undefined). @@ -211,22 +234,10 @@ list_instances() -> list_instances_verbose() -> emqx_resource_instance:list_all(). --spec get_instance_by_type(module()) -> [resource_data()]. -get_instance_by_type(ResourceType) -> +-spec list_instances_by_type(module()) -> [resource_data()]. +list_instances_by_type(ResourceType) -> emqx_resource_instance:lookup_by_type(ResourceType). --spec load_instances_from_dir(Dir :: string()) -> ok. -load_instances_from_dir(Dir) -> - emqx_resource_instance:load_dir(Dir). - --spec load_instance_from_file(File :: string()) -> ok. -load_instance_from_file(File) -> - emqx_resource_instance:load_file(File). - --spec load_instance_from_config(binary() | map()) -> {ok, resource_data()} | {error, term()}. -load_instance_from_config(Config) -> - emqx_resource_instance:load_config(Config). - -spec call_start(instance_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(InstId, Mod, Config) -> @@ -260,31 +271,52 @@ call_jsonify(Mod, Config) -> true -> ?SAFE_CALL(Mod:on_jsonify(Config)) end. --spec call_api_reply_format(module(), resource_data()) -> jsx:json_term(). -call_api_reply_format(Mod, Data) -> - case erlang:function_exported(Mod, on_api_reply_format, 1) of - false -> emqx_resource_api:default_api_reply_format(Data); - true -> ?SAFE_CALL(Mod:on_api_reply_format(Data)) - end. - --spec parse_config(resource_type(), binary() | term()) -> +-spec check_config(resource_type(), binary() | term()) -> {ok, resource_config()} | {error, term()}. -parse_config(ResourceType, RawConfig) when is_binary(RawConfig) -> +check_config(ResourceType, RawConfig) when is_binary(RawConfig) -> case hocon:binary(RawConfig, #{format => richmap}) of {ok, MapConfig} -> - do_parse_config(ResourceType, MapConfig); + do_check_config(ResourceType, MapConfig); Error -> Error end; -parse_config(ResourceType, RawConfigTerm) -> - parse_config(ResourceType, jsx:encode(#{config => RawConfigTerm})). +check_config(ResourceType, RawConfigTerm) -> + check_config(ResourceType, jsx:encode(#{config => RawConfigTerm})). --spec do_parse_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}. -do_parse_config(ResourceType, MapConfig) -> - case ?SAFE_CALL(hocon_schema:generate(ResourceType, MapConfig)) of +-spec do_check_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}. +do_check_config(ResourceType, MapConfig) -> + case ?SAFE_CALL(hocon_schema:check(ResourceType, MapConfig)) of {error, Reason} -> {error, Reason}; - Config -> - InstConf = maps:from_list(proplists:get_value(config, Config)), - {ok, InstConf} + Config -> {ok, maps:get(<<"config">>, hocon_schema:richmap_to_map(Config))} + end. + +-spec check_and_create(instance_id(), resource_type(), binary() | term()) -> + {ok, resource_data()} | {error, term()}. +check_and_create(InstId, ResourceType, Config) -> + check_and_do(ResourceType, Config, + fun(InstConf) -> create(InstId, ResourceType, InstConf) end). + +-spec check_and_create_local(instance_id(), resource_type(), binary() | term()) -> + {ok, resource_data()} | {error, term()}. +check_and_create_local(InstId, ResourceType, Config) -> + check_and_do(ResourceType, Config, + fun(InstConf) -> create_local(InstId, ResourceType, InstConf) end). + +-spec check_and_update(instance_id(), resource_type(), binary() | term(), term()) -> + {ok, resource_data()} | {error, term()}. +check_and_update(InstId, ResourceType, Config, Params) -> + check_and_do(ResourceType, Config, + fun(InstConf) -> update(InstId, ResourceType, InstConf, Params) end). + +-spec check_and_update_local(instance_id(), resource_type(), binary() | term(), term()) -> + {ok, resource_data()} | {error, term()}. +check_and_update_local(InstId, ResourceType, Config, Params) -> + check_and_do(ResourceType, Config, + fun(InstConf) -> update_local(InstId, ResourceType, InstConf, Params) end). + +check_and_do(ResourceType, Config, Do) when is_function(Do) -> + case check_config(ResourceType, Config) of + {ok, InstConf} -> Do(InstConf); + Error -> Error end. %% ================================================================================= diff --git a/apps/emqx_resource/src/emqx_resource_api.erl b/apps/emqx_resource/src/emqx_resource_api.erl index 1e19cdede..fe1ca4509 100644 --- a/apps/emqx_resource/src/emqx_resource_api.erl +++ b/apps/emqx_resource/src/emqx_resource_api.erl @@ -15,61 +15,16 @@ %%-------------------------------------------------------------------- -module(emqx_resource_api). --export([ get_all/3 - , get/3 - , put/3 - , delete/3 +-export([ list_instances/1 + , format_data/1 + , stringnify/1 ]). --export([default_api_reply_format/1]). +list_instances(Filter) -> + [format_data(Data) || Data <- emqx_resource:list_instances_verbose(), Filter(Data)]. -get_all(Mod, _Binding, _Params) -> - {200, #{code => 0, data => - [format_data(Mod, Data) || Data <- emqx_resource:list_instances_verbose()]}}. - -get(Mod, #{id := Id}, _Params) -> - case emqx_resource:get_instance(stringnify(Id)) of - {ok, Data} -> - {200, #{code => 0, data => format_data(Mod, Data)}}; - {error, not_found} -> - {404, #{code => 102, message => {resource_instance_not_found, stringnify(Id)}}} - end. - -put(Mod, #{id := Id}, Params) -> - ConfigParams = proplists:get_value(<<"config">>, Params), - ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params, #{}), - case emqx_resource:resource_type_from_str(ResourceTypeStr) of - {ok, ResourceType} -> - do_put(Mod, stringnify(Id), ConfigParams, ResourceType, Params); - {error, Reason} -> - {404, #{code => 102, message => stringnify(Reason)}} - end. - -do_put(Mod, Id, ConfigParams, ResourceType, Params) -> - case emqx_resource:parse_config(ResourceType, ConfigParams) of - {ok, Config} -> - case emqx_resource:update(Id, ResourceType, Config, Params) of - {ok, Data} -> - {200, #{code => 0, data => format_data(Mod, Data)}}; - {error, Reason} -> - {500, #{code => 102, message => stringnify(Reason)}} - end; - {error, Reason} -> - {400, #{code => 108, message => stringnify(Reason)}} - end. - -delete(_Mod, #{id := Id}, _Params) -> - case emqx_resource:remove(stringnify(Id)) of - ok -> {200, #{code => 0, data => #{}}}; - {error, Reason} -> - {500, #{code => 102, message => stringnify(Reason)}} - end. - -format_data(Mod, Data) -> - emqx_resource:call_api_reply_format(Mod, Data). - -default_api_reply_format(#{id := Id, mod := Mod, status := Status, config := Config}) -> - #{node => node(), id => Id, status => Status, resource_type => Mod, +format_data(#{id := Id, mod := Mod, status := Status, config := Config}) -> + #{id => Id, status => Status, resource_type => Mod, config => emqx_resource:call_jsonify(Mod, Config)}. stringnify(Bin) when is_binary(Bin) -> Bin; diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index dbbf052a1..1e924e249 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -23,10 +23,7 @@ -export([start_link/2]). %% load resource instances from *.conf files --export([ load_dir/1 - , load_file/1 - , load_config/1 - , lookup/1 +-export([ lookup/1 , list_all/0 , lookup_by_type/1 , create_local/3 @@ -85,44 +82,8 @@ lookup_by_type(ResourceType) -> [Data || #{mod := Mod} = Data <- list_all() , Mod =:= ResourceType]. --spec load_dir(Dir :: string()) -> ok. -load_dir(Dir) -> - lists:foreach(fun load_file/1, filelib:wildcard(filename:join([Dir, "*.conf"]))). - -load_file(File) -> - case ?SAFE_CALL(hocon_token:read(File)) of - {error, Reason} -> - logger:error("load resource from ~p failed: ~p", [File, Reason]); - RawConfig -> - case load_config(RawConfig) of - {ok, Data} -> - logger:debug("loaded resource instance from file: ~p, data: ~p", - [File, Data]); - {error, Reason} -> - logger:error("load resource from ~p failed: ~p", [File, Reason]) - end - end. - --spec load_config(binary() | map()) -> {ok, resource_data()} | {error, term()}. -load_config(RawConfig) when is_binary(RawConfig) -> - case hocon:binary(RawConfig, #{format => map}) of - {ok, ConfigTerm} -> load_config(ConfigTerm); - Error -> Error - end; - -load_config(#{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr} = Config) -> - MapConfig = maps:get(<<"config">>, Config, #{}), - case emqx_resource:resource_type_from_str(ResourceTypeStr) of - {ok, ResourceType} -> parse_and_load_config(Id, ResourceType, MapConfig); - Error -> Error - end. - -parse_and_load_config(InstId, ResourceType, MapConfig) -> - case emqx_resource:parse_config(ResourceType, MapConfig) of - {ok, InstConf} -> create_local(InstId, ResourceType, InstConf); - Error -> Error - end. - +-spec create_local(instance_id(), resource_type(), resource_config()) -> + {ok, resource_data()} | {error, term()}. create_local(InstId, ResourceType, InstConf) -> case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of {ok, Data} -> {ok, Data}; @@ -206,7 +167,7 @@ do_update(InstId, ResourceType, NewConfig, Params) -> {ok, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> - do_create(InstId, ResourceType, NewConfig) + {error, not_found} end. do_create(InstId, ResourceType, Config) -> diff --git a/apps/emqx_resource/src/emqx_resource_transform.erl b/apps/emqx_resource/src/emqx_resource_transform.erl index 41e79bde7..34c6633ef 100644 --- a/apps/emqx_resource/src/emqx_resource_transform.erl +++ b/apps/emqx_resource/src/emqx_resource_transform.erl @@ -22,7 +22,7 @@ parse_transform(Forms, _Opts) -> Mod = hd([M || {attribute, _, module, M} <- Forms]), AST = trans(Mod, proplists:delete(eof, Forms)), - debug_print(Mod, AST), + _ = debug_print(Mod, AST), AST. -ifdef(RESOURCE_DEBUG). @@ -47,68 +47,34 @@ trans(Mod, Forms) -> forms(Mod, [F0 | Fs0]) -> case form(Mod, F0) of - {CurrForm, AppendedForms} -> - CurrForm ++ forms(Mod, Fs0) ++ AppendedForms; - {AHeadForms, CurrForm, AppendedForms} -> - AHeadForms ++ CurrForm ++ forms(Mod, Fs0) ++ AppendedForms + {CurrForms, AppendedForms} -> + CurrForms ++ forms(Mod, Fs0) ++ AppendedForms; + {CurrForms, FollowerForms, AppendedForms} -> + CurrForms ++ FollowerForms ++ forms(Mod, Fs0) ++ AppendedForms end; forms(_, []) -> []. form(Mod, Form) -> case Form of - ?Q("-emqx_resource_api_path('@Path').") -> - {fix_spec_attrs() ++ fix_api_attrs(Mod, erl_syntax:concrete(Path)) - ++ fix_api_exports(), - [], - fix_spec_funcs(Mod) ++ fix_api_funcs(Mod)}; + ?Q("-module('@_').") -> + {[Form], fix_spec_attrs(), fix_spec_funcs(Mod)}; _ -> %io:format("---other form: ~p~n", [Form]), - {[], [Form], []} + {[Form], [], []} end. fix_spec_attrs() -> [ ?Q("-export([emqx_resource_schema/0]).") - , ?Q("-export([structs/0]).") + , ?Q("-export([structs/0, fields/1]).") , ?Q("-behaviour(hocon_schema).") ]. fix_spec_funcs(_Mod) -> - [ (?Q("emqx_resource_schema() -> <<\"demo_swagger_schema\">>.")) + [ ?Q("emqx_resource_schema() -> <<\"demo_swagger_schema\">>.") , ?Q("structs() -> [\"config\"].") + , ?Q("fields(\"config\") -> " + "[fun (type) -> \"schema\"; " + " (_) -> undefined " + " end];" + "fields(\"schema\") -> schema()." + ) ]. - -fix_api_attrs(Mod, Path) -> - BaseName = atom_to_list(Mod), - [erl_syntax:revert( - erl_syntax:attribute(?Q("rest_api"), [ - erl_syntax:abstract(#{ - name => list_to_atom(Act ++ "_" ++ BaseName), - method => Method, - path => mk_path(Path, WithId), - func => Func, - descr => Act ++ " the " ++ BaseName})])) - || {Act, Method, WithId, Func} <- [ - {"list", 'GET', noid, api_get_all}, - {"get", 'GET', id, api_get}, - {"update", 'PUT', id, api_put}, - {"delete", 'DELETE', id, api_delete}]]. - -fix_api_exports() -> - [?Q("-export([api_get_all/2, api_get/2, api_put/2, api_delete/2]).")]. - -fix_api_funcs(Mod) -> - [erl_syntax:revert(?Q( - "api_get_all(Binding, Params) -> - emqx_resource_api:get_all('@Mod@', Binding, Params).")), - erl_syntax:revert(?Q( - "api_get(Binding, Params) -> - emqx_resource_api:get('@Mod@', Binding, Params).")), - erl_syntax:revert(?Q( - "api_put(Binding, Params) -> - emqx_resource_api:put('@Mod@', Binding, Params).")), - erl_syntax:revert(?Q( - "api_delete(Binding, Params) -> - emqx_resource_api:delete('@Mod@', Binding, Params).")) - ]. - -mk_path(Path, id) -> string:trim(Path, trailing, "/") ++ "/:bin:id"; -mk_path(Path, noid) -> Path. diff --git a/data/loaded_plugins.tmpl b/data/loaded_plugins.tmpl index 236eeaa95..5ac46e0e3 100644 --- a/data/loaded_plugins.tmpl +++ b/data/loaded_plugins.tmpl @@ -6,4 +6,6 @@ {emqx_telemetry, {{enable_plugin_emqx_telemetry}}}. {emqx_rule_engine, {{enable_plugin_emqx_rule_engine}}}. {emqx_resource, {{enable_plugin_emqx_resource}}}. +{emqx_connector, {{enable_plugin_emqx_connector}}}. +{emqx_data_bridge, {{enable_plugin_emqx_data_bridge}}}. {emqx_bridge_mqtt, {{enable_plugin_emqx_bridge_mqtt}}}. diff --git a/rebar.config.erl b/rebar.config.erl index df943cc42..5cc857398 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -189,6 +189,8 @@ overlay_vars_rel(RelType) -> [ {enable_plugin_emqx_rule_engine, RelType =:= cloud} , {enable_plugin_emqx_bridge_mqtt, RelType =:= edge} , {enable_plugin_emqx_resource, true} + , {enable_plugin_emqx_connector, true} + , {enable_plugin_emqx_data_bridge, true} , {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce , {enable_plugin_emqx_recon, true} , {enable_plugin_emqx_retainer, true} @@ -284,6 +286,8 @@ relx_plugin_apps(ReleaseType) -> , emqx_web_hook , emqx_recon , emqx_resource + , emqx_connector + , emqx_data_bridge , emqx_rule_engine , emqx_sasl ]