feat(emqx_data_bridge): create emqx_data_bridge

This commit is contained in:
Shawn 2021-06-04 23:47:16 +08:00
parent 3da62e59d6
commit f1552f4f4f
25 changed files with 311 additions and 158 deletions

View File

@ -1,8 +1,12 @@
# emqx_connector
A connector is an object that maintains the data related to external resources.
This application is a collection of `connectors`.
For example, a mysql connector is an object that maintains all the mysql connection
A `connector` is a callback module of `emqx_resource` that maintains the data related to
external resources. Put all resource related callback modules in a single application is good as
we can put some util functions/modules here for reusing purpose.
For example, a mysql connector is an emqx resource that maintains all the mysql connection
related parameters (configs) and the TCP connections to the mysql server.
An mysql connector can be used as following:

View File

@ -2,18 +2,3 @@
## EMQ X CONNECTOR Plugin
##--------------------------------------------------------------------
connectors: [
{id: "mysql-abc"
resource_type: emqx_connector_mysql
config: {
server: "127.0.0.1:3306"
database: mqtt
pool_size: 1
user: root
password: public
auto_reconnect: true
ssl: false
}
}
]

View File

@ -1,7 +1,2 @@
%%-*- mode: erlang -*-
%% emqx_connector config mapping
{mapping, "connectors", "emqx_connector.connectors", [
{default, []},
{datatype, string}
]}.

View File

@ -14,27 +14,3 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector).
-export([ load_from_config/1
, load_connectors/1
, load_connector/1
]).
load_from_config(Filename) ->
case hocon:load(Filename, #{format => map}) of
{ok, #{<<"connectors">> := Connectors}} ->
load_connectors(Connectors);
{error, Reason} ->
error(Reason)
end.
load_connectors(Connectors) ->
lists:foreach(fun load_connector/1, Connectors).
load_connector(Config) ->
case emqx_resource:load_instance_from_config(Config) of
{ok, _} -> ok;
{error, already_created} -> ok;
{error, Reason} ->
error({load_connector, Reason})
end.

View File

@ -23,8 +23,6 @@
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
Connectors = proplists:get_value(connectors, application:get_all_env(emqx_connector), []),
emqx_connector:load_connectors(Connectors),
emqx_connector_sup:start_link().
stop(_State) ->

View File

@ -20,8 +20,7 @@
-emqx_resource_api_path("connectors/mysql").
-export([ fields/1
, on_jsonify/1
-export([ on_jsonify/1
, on_api_reply_format/1
]).
@ -37,36 +36,38 @@
-export([do_health_check/1]).
%%=====================================================================
fields("config") ->
schema() ->
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().
on_jsonify(#{server := Server, user := User, database := DB, password := Passwd,
cacertfile := CAFile, keyfile := KeyFile, certfile := CertFile} = Config) ->
on_jsonify(#{<<"server">> := Server, <<"user">> := User, <<"database">> := DB,
<<"password">> := Passwd, <<"cacertfile">> := CAFile,
<<"keyfile">> := KeyFile, <<"certfile">> := CertFile} = Config) ->
Config#{
user => list_to_binary(User),
database => list_to_binary(DB),
password => list_to_binary(Passwd),
server => emqx_connector_schema_lib:ip_port_to_string(Server),
cacertfile => list_to_binary(CAFile),
keyfile => list_to_binary(KeyFile),
certfile => list_to_binary(CertFile)
<<"user">> => list_to_binary(User),
<<"database">> => list_to_binary(DB),
<<"password">> => list_to_binary(Passwd),
<<"server">> => emqx_connector_schema_lib:ip_port_to_string(Server),
<<"cacertfile">> => list_to_binary(CAFile),
<<"keyfile">> => list_to_binary(KeyFile),
<<"certfile">> => list_to_binary(CertFile)
}.
on_api_reply_format(ResourceData) ->
#{config := Conf} = Reply0 = emqx_resource_api:default_api_reply_format(ResourceData),
Reply0#{config => maps:without([cacertfile, keyfile, certfile, verify], Conf)}.
Reply0#{config => maps:without([<<"cacertfile">>, <<"keyfile">>,
<<"certfile">>, <<"verify">>], Conf)}.
%% ===================================================================
on_start(InstId, #{server := {Host, Port},
database := DB,
user := User,
password := Password,
auto_reconnect := AutoReconn,
pool_size := PoolSize} = Config) ->
on_start(InstId, #{<<"server">> := {Host, Port},
<<"database">> := DB,
<<"user">> := User,
<<"password">> := Password,
<<"auto_reconnect">> := AutoReconn,
<<"pool_size">> := PoolSize} = Config) ->
logger:info("starting mysql connector: ~p, config: ~p", [InstId, Config]),
SslOpts = case maps:get(ssl, Config) of
SslOpts = case maps:get(<<"ssl">>, Config) of
true ->
[{ssl, [{server_name_indication, disable} |
emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)]}];

View File

@ -52,57 +52,47 @@ ssl_fields() ->
, {verify, fun verify/1}
].
server(mapping) -> "config.server";
server(type) -> ip_port();
server(validator) -> [?REQUIRED("the field 'server' is required")];
server(_) -> undefined.
database(mapping) -> "config.database";
database(type) -> string();
database(validator) -> [?REQUIRED("the field 'server' is required")];
database(_) -> undefined.
pool_size(mapping) -> "config.pool_size";
pool_size(type) -> integer();
pool_size(default) -> 8;
pool_size(validator) -> [?MIN(1), ?MAX(64)];
pool_size(_) -> undefined.
user(mapping) -> "config.user";
user(type) -> string();
user(default) -> "root";
user(_) -> undefined.
password(mapping) -> "config.password";
password(type) -> string();
password(default) -> "";
password(_) -> undefined.
auto_reconnect(mapping) -> "config.auto_reconnect";
auto_reconnect(type) -> boolean();
auto_reconnect(default) -> true;
auto_reconnect(_) -> undefined.
ssl(mapping) -> "config.ssl";
ssl(type) -> boolean();
ssl(default) -> false;
ssl(_) -> undefined.
cacertfile(mapping) -> "config.cacertfile";
cacertfile(type) -> string();
cacertfile(default) -> "";
cacertfile(_) -> undefined.
keyfile(mapping) -> "config.keyfile";
keyfile(type) -> string();
keyfile(default) -> "";
keyfile(_) -> undefined.
certfile(mapping) -> "config.certfile";
certfile(type) -> string();
certfile(default) -> "";
certfile(_) -> undefined.
verify(mapping) -> "config.verify";
verify(type) -> boolean();
verify(default) -> false;
verify(_) -> undefined.

19
apps/emqx_data_bridge/.gitignore vendored Normal file
View File

@ -0,0 +1,19 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~

View File

@ -0,0 +1,10 @@
# emqx_data_bridge
EMQ X Data Bridge is an application that managing the resources (see emqx_resource) used by emqx
rule engine.
It provides CRUD HTTP APIs of the resources, and is also responsible for loading the resources at
startup, and saving configs of resources to `data/` after configs updated.
The application depends on `emqx_connector` as that's where all the callback modules of `connector`
resources placed.

View File

@ -0,0 +1,30 @@
##--------------------------------------------------------------------
## EMQ X Bridge Plugin
##--------------------------------------------------------------------
emqx_data_bridge.bridges: [
{name: "mysql-abc"
type: mysql
config: {
server: "127.0.0.1:3306"
database: mqtt
pool_size: 1
user: root
password: public
auto_reconnect: true
ssl: false
}
},
{name: "mysql-def"
type: mysql
config: {
server: "127.0.0.1:3306"
database: mqtt
pool_size: 1
user: root
password: public
auto_reconnect: true
ssl: false
}
}
]

View File

@ -0,0 +1,16 @@
%%-*- mode: erlang -*-
%% emqx_data_bridge config mapping
{mapping, "emqx_data_bridge.bridges", "emqx_data_bridge.bridges", [
{default, []},
{datatype, string}
]}.
% fields("emqx_data_bridge") ->
% [
% {bridges,
% [fun(mapping) -> "emqx_data_bridge.bridges";
% (type) -> list();
% (_) -> undefined
% end]}
% ]

View File

@ -0,0 +1,7 @@
{erl_opts, [debug_info]}.
{deps, []}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_data_bridge]}
]}.

View File

@ -0,0 +1,15 @@
{application, emqx_data_bridge,
[{description, "An OTP application"},
{vsn, "0.1.0"},
{registered, []},
{mod, {emqx_data_bridge_app, []}},
{applications,
[kernel,
stdlib
]},
{env,[]},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -0,0 +1,9 @@
-module(emqx_data_bridge).
-export([ load_bridges/0
]).
load_bridges() ->
Bridges = proplists:get_value(bridges,
application:get_all_env(emqx_data_bridge), []),
emqx_data_bridge_monitor:ensure_all_started(Bridges).

View File

@ -0,0 +1,32 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_data_bridge_app).
-behaviour(application).
-emqx_plugin(?MODULE).
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_data_bridge_sup:start_link(),
ok = emqx_data_bridge:load_bridges(),
{ok, Sup}.
stop(_State) ->
ok.
%% internal functions

View File

@ -0,0 +1,64 @@
%% This process monitors all the data bridges, and try to restart a bridge
%% when one of it stopped.
-module(emqx_data_bridge_monitor).
-behaviour(gen_server).
%% API functions
-export([ start_link/0
, ensure_all_started/1
]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
ensure_all_started(Configs) ->
gen_server:cast(?MODULE, {start_and_monitor, Configs}).
init([]) ->
{ok, #state{}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast({start_and_monitor, Configs}, State) ->
ok = load_bridges(Configs),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%============================================================================
load_bridges(Configs) ->
lists:foreach(fun load_bridge/1, Configs).
load_bridge(#{<<"name">> := Name, <<"type">> := Type,
<<"config">> := Config}) ->
case emqx_resource:check_and_load_instance(Name, resource_type(Type), Config) of
{ok, _} -> ok;
{error, already_created} -> ok;
{error, Reason} ->
error({load_bridge, Reason})
end.
resource_type(<<"mysql">>) -> emqx_connector_mysql.

View File

@ -0,0 +1,41 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_data_bridge_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
SupFlags = #{strategy => one_for_one,
intensity => 10,
period => 10},
ChildSpecs = [
#{id => emqx_data_bridge_monitor,
start => {emqx_data_bridge_monitor, start_link, []},
restart => permanent,
type => worker,
modules => [emqx_data_bridge_monitor]}],
{ok, {SupFlags, ChildSpecs}}.
%% internal functions

View File

@ -14,10 +14,10 @@
]).
%% callbacks for emqx_resource config schema
-export([fields/1]).
-export([schema/0]).
fields(ConfPath) ->
log_tracer_schema:fields(ConfPath).
schema() ->
log_tracer_schema:schema().
on_start(InstId, Config) ->
io:format("== the demo log tracer ~p started.~nconfig: ~p~n", [InstId, Config]),

View File

@ -2,7 +2,7 @@
-include_lib("typerefl/include/types.hrl").
-export([fields/1]).
-export([schema/0]).
-reflect_type([t_level/0, t_cache_logs_in/0]).
@ -10,15 +10,14 @@
-type t_cache_logs_in() :: memory | file.
fields("config") ->
schema() ->
[ {condition, fun condition/1}
, {level, fun level/1}
, {enable_cache, fun enable_cache/1}
, {cache_logs_in, fun cache_logs_in/1}
, {cache_log_dir, fun cache_log_dir/1}
, {bulk, fun bulk/1}
];
fields(_) -> [].
].
condition(mapping) -> "config.condition";
condition(type) -> map();

View File

@ -37,7 +37,8 @@
%% APIs for instances
-export([ parse_config/2
-export([ check_config/2
, check_and_load_instance/3
, resource_type_from_str/1
]).
@ -73,9 +74,6 @@
, list_instances_verbose/0 %% list all the instances
, get_instance/1 %% return the data of the instance
, get_instance_by_type/1 %% return all the instances of the same resource type
, load_instances_from_dir/1 %% load instances from a directory
, load_instance_from_file/1 %% load an instance from a config file
, load_instance_from_config/1 %% load an instance from a map or json-string config
% , dependents/1
% , inc_counter/2 %% increment the counter of the instance
% , inc_counter/3 %% increment the counter by a given integer
@ -215,18 +213,6 @@ list_instances_verbose() ->
get_instance_by_type(ResourceType) ->
emqx_resource_instance:lookup_by_type(ResourceType).
-spec load_instances_from_dir(Dir :: string()) -> ok.
load_instances_from_dir(Dir) ->
emqx_resource_instance:load_dir(Dir).
-spec load_instance_from_file(File :: string()) -> ok.
load_instance_from_file(File) ->
emqx_resource_instance:load_file(File).
-spec load_instance_from_config(binary() | map()) -> ok.
load_instance_from_config(Config) ->
emqx_resource_instance:load_config(Config).
-spec call_start(instance_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}.
call_start(InstId, Mod, Config) ->
@ -267,24 +253,30 @@ call_api_reply_format(Mod, Data) ->
true -> ?SAFE_CALL(Mod:on_api_reply_format(Data))
end.
-spec parse_config(resource_type(), binary() | term()) ->
-spec check_config(resource_type(), binary() | term()) ->
{ok, resource_config()} | {error, term()}.
parse_config(ResourceType, RawConfig) when is_binary(RawConfig) ->
check_config(ResourceType, RawConfig) when is_binary(RawConfig) ->
case hocon:binary(RawConfig, #{format => richmap}) of
{ok, MapConfig} ->
do_parse_config(ResourceType, MapConfig);
do_check_config(ResourceType, MapConfig);
Error -> Error
end;
parse_config(ResourceType, RawConfigTerm) ->
parse_config(ResourceType, jsx:encode(#{config => RawConfigTerm})).
check_config(ResourceType, RawConfigTerm) ->
check_config(ResourceType, jsx:encode(#{config => RawConfigTerm})).
-spec do_parse_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}.
do_parse_config(ResourceType, MapConfig) ->
case ?SAFE_CALL(hocon_schema:generate(ResourceType, MapConfig)) of
-spec do_check_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}.
do_check_config(ResourceType, MapConfig) ->
case ?SAFE_CALL(hocon_schema:check(ResourceType, MapConfig)) of
{error, Reason} -> {error, Reason};
Config ->
InstConf = maps:from_list(proplists:get_value(config, Config)),
{ok, InstConf}
Config -> {ok, maps:get(<<"config">>, hocon:richmap_to_map(Config))}
end.
-spec check_and_load_instance(instance_id(), resource_type(), binary() | term()) ->
{ok, resource_data()} | {error, term()}.
check_and_load_instance(InstId, ResourceType, Config) ->
case check_config(ResourceType, Config) of
{ok, InstConf} -> emqx_resource_instance:create_local(InstId, ResourceType, InstConf);
Error -> Error
end.
%% =================================================================================

View File

@ -46,7 +46,7 @@ put(Mod, #{id := Id}, Params) ->
end.
do_put(Mod, Id, ConfigParams, ResourceType, Params) ->
case emqx_resource:parse_config(ResourceType, ConfigParams) of
case emqx_resource:check_config(ResourceType, ConfigParams) of
{ok, Config} ->
case emqx_resource:update(Id, ResourceType, Config, Params) of
{ok, Data} ->

View File

@ -23,10 +23,7 @@
-export([start_link/2]).
%% load resource instances from *.conf files
-export([ load_dir/1
, load_file/1
, load_config/1
, lookup/1
-export([ lookup/1
, list_all/0
, lookup_by_type/1
, create_local/3
@ -85,44 +82,8 @@ lookup_by_type(ResourceType) ->
[Data || #{mod := Mod} = Data <- list_all()
, Mod =:= ResourceType].
-spec load_dir(Dir :: string()) -> ok.
load_dir(Dir) ->
lists:foreach(fun load_file/1, filelib:wildcard(filename:join([Dir, "*.conf"]))).
load_file(File) ->
case ?SAFE_CALL(hocon_token:read(File)) of
{error, Reason} ->
logger:error("load resource from ~p failed: ~p", [File, Reason]);
RawConfig ->
case load_config(RawConfig) of
{ok, Data} ->
logger:debug("loaded resource instance from file: ~p, data: ~p",
[File, Data]);
{error, Reason} ->
logger:error("load resource from ~p failed: ~p", [File, Reason])
end
end.
-spec load_config(binary() | map()) -> {ok, resource_data()} | {error, term()}.
load_config(RawConfig) when is_binary(RawConfig) ->
case hocon:binary(RawConfig, #{format => map}) of
{ok, ConfigTerm} -> load_config(ConfigTerm);
Error -> Error
end;
load_config(#{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr} = Config) ->
MapConfig = maps:get(<<"config">>, Config, #{}),
case emqx_resource:resource_type_from_str(ResourceTypeStr) of
{ok, ResourceType} -> parse_and_load_config(Id, ResourceType, MapConfig);
Error -> Error
end.
parse_and_load_config(InstId, ResourceType, MapConfig) ->
case emqx_resource:parse_config(ResourceType, MapConfig) of
{ok, InstConf} -> create_local(InstId, ResourceType, InstConf);
Error -> Error
end.
-spec create_local(instance_id(), resource_type(), resource_config()) ->
{ok, resource_data()} | {error, term()}.
create_local(InstId, ResourceType, InstConf) ->
case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of
{ok, Data} -> {ok, Data};

View File

@ -68,12 +68,18 @@ form(Mod, Form) ->
fix_spec_attrs() ->
[ ?Q("-export([emqx_resource_schema/0]).")
, ?Q("-export([structs/0]).")
, ?Q("-export([structs/0, fields/1]).")
, ?Q("-behaviour(hocon_schema).")
].
fix_spec_funcs(_Mod) ->
[ (?Q("emqx_resource_schema() -> <<\"demo_swagger_schema\">>."))
[ ?Q("emqx_resource_schema() -> <<\"demo_swagger_schema\">>.")
, ?Q("structs() -> [\"config\"].")
, ?Q("fields(\"config\") -> "
"[fun (type) -> \"schema\"; "
" (_) -> undefined "
" end];"
"fields(\"schema\") -> schema()."
)
].
fix_api_attrs(Mod, Path) ->

View File

@ -7,4 +7,5 @@
{emqx_rule_engine, {{enable_plugin_emqx_rule_engine}}}.
{emqx_resource, {{enable_plugin_emqx_resource}}}.
{emqx_connector, {{enable_plugin_emqx_connector}}}.
{emqx_data_bridge, {{enable_plugin_emqx_data_bridge}}}.
{emqx_bridge_mqtt, {{enable_plugin_emqx_bridge_mqtt}}}.

View File

@ -182,6 +182,7 @@ overlay_vars_rel(RelType) ->
, {enable_plugin_emqx_bridge_mqtt, RelType =:= edge}
, {enable_plugin_emqx_resource, true}
, {enable_plugin_emqx_connector, true}
, {enable_plugin_emqx_data_bridge, true}
, {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce
, {enable_plugin_emqx_recon, true}
, {enable_plugin_emqx_retainer, true}
@ -277,6 +278,7 @@ relx_plugin_apps(ReleaseType) ->
, emqx_recon
, emqx_resource
, emqx_connector
, emqx_data_bridge
, emqx_rule_engine
, emqx_sasl
]