chore(merge): Merge branch 'emqx_connector'

This commit is contained in:
Shawn 2021-06-08 10:01:22 +08:00
commit 4ecf469cbf
32 changed files with 1034 additions and 195 deletions

19
apps/emqx_connector/.gitignore vendored Normal file
View File

@ -0,0 +1,19 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~

View File

@ -0,0 +1,27 @@
# emqx_connector
This application is a collection of `connectors`.
A `connector` is a callback module of `emqx_resource` that maintains the data related to
external resources. Put all resource related callback modules in a single application is good as
we can put some util functions/modules here for reusing purpose.
For example, a mysql connector is an emqx resource that maintains all the mysql connection
related parameters (configs) and the TCP connections to the mysql server.
An mysql connector can be used as following:
```
(emqx@127.0.0.1)5> emqx_resource:list_instances_verbose().
[#{config =>
#{auto_reconnect => true,cacertfile => [],certfile => [],
database => "mqtt",keyfile => [],password => "public",
pool_size => 1,
server => {{127,0,0,1},3306},
ssl => false,user => "root",verify => false},
id => <<"mysql-abc">>,mod => emqx_connector_mysql,
state => #{poolname => 'mysql-abc'},
status => started}]
(emqx@127.0.0.1)6> emqx_resource:query(<<"mysql-abc">>, {sql, <<"SELECT count(1)">>}).
{ok,[<<"count(1)">>],[[1]]}
```

View File

@ -0,0 +1,4 @@
##--------------------------------------------------------------------
## EMQ X CONNECTOR Plugin
##--------------------------------------------------------------------

View File

@ -0,0 +1,2 @@
%%-*- mode: erlang -*-
%% emqx_connector config mapping

View File

@ -0,0 +1,13 @@
{erl_opts, [
nowarn_unused_import,
debug_info
]}.
{deps, [
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}}
]}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_connector]}
]}.

View File

@ -0,0 +1,17 @@
{application, emqx_connector,
[{description, "An OTP application"},
{vsn, "0.1.0"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications,
[kernel,
stdlib,
emqx_resource,
ecpool
]},
{env,[]},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -0,0 +1,16 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector).

View File

@ -0,0 +1,31 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_app).
-behaviour(application).
-emqx_plugin(?MODULE).
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
emqx_connector_sup:start_link().
stop(_State) ->
ok.
%% internal functions

View File

@ -0,0 +1,106 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_mysql).
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-export([ on_jsonify/1
]).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
, on_stop/2
, on_query/4
, on_health_check/2
]).
-export([connect/1]).
-export([do_health_check/1]).
%%=====================================================================
schema() ->
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().
on_jsonify(#{<<"server">> := Server, <<"user">> := User, <<"database">> := DB,
<<"password">> := Passwd, <<"cacertfile">> := CAFile,
<<"keyfile">> := KeyFile, <<"certfile">> := CertFile} = Config) ->
Config#{
<<"user">> => list_to_binary(User),
<<"database">> => list_to_binary(DB),
<<"password">> => list_to_binary(Passwd),
<<"server">> => emqx_connector_schema_lib:ip_port_to_string(Server),
<<"cacertfile">> => list_to_binary(CAFile),
<<"keyfile">> => list_to_binary(KeyFile),
<<"certfile">> => list_to_binary(CertFile)
}.
%% ===================================================================
on_start(InstId, #{<<"server">> := {Host, Port},
<<"database">> := DB,
<<"user">> := User,
<<"password">> := Password,
<<"auto_reconnect">> := AutoReconn,
<<"pool_size">> := PoolSize} = Config) ->
logger:info("starting mysql connector: ~p, config: ~p", [InstId, Config]),
SslOpts = case maps:get(<<"ssl">>, Config) of
true ->
[{ssl, [{server_name_indication, disable} |
emqx_plugin_libs_ssl:save_files_return_opts(Config, "connectors", InstId)]}];
false ->
[]
end,
Options = [{host, Host},
{port, Port},
{user, User},
{password, Password},
{database, DB},
{auto_reconnect, reconn_interval(AutoReconn)},
{pool_size, PoolSize}],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
_ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts),
{ok, #{poolname => PoolName}}.
on_stop(InstId, #{poolname := PoolName}) ->
logger:info("stopping mysql connector: ~p", [InstId]),
emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) ->
logger:debug("mysql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]),
case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL]}, no_handover) of
{error, Reason} ->
logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]),
emqx_resource:query_failed(AfterQuery);
_ ->
emqx_resource:query_success(AfterQuery)
end,
Result.
on_health_check(_InstId, #{poolname := PoolName} = State) ->
emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).
do_health_check(Conn) ->
ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
%% ===================================================================
reconn_interval(true) -> 15;
reconn_interval(false) -> false.
connect(Options) ->
mysql:start_link(Options).

View File

@ -0,0 +1,111 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_schema_lib).
-include_lib("typerefl/include/types.hrl").
-export([ relational_db_fields/0
, ssl_fields/0
]).
-export([ to_ip_port/1
, ip_port_to_string/1
]).
-typerefl_from_string({ip_port/0, emqx_connector_schema_lib, to_ip_port}).
-reflect_type([ip_port/0]).
-type ip_port() :: tuple().
-define(VALID, emqx_resource_validator).
-define(REQUIRED(MSG), ?VALID:required(MSG)).
-define(MAX(MAXV), ?VALID:max(number, MAXV)).
-define(MIN(MINV), ?VALID:min(number, MINV)).
relational_db_fields() ->
[ {server, fun server/1}
, {database, fun database/1}
, {pool_size, fun pool_size/1}
, {user, fun user/1}
, {password, fun password/1}
, {auto_reconnect, fun auto_reconnect/1}
].
ssl_fields() ->
[ {ssl, fun ssl/1}
, {cacertfile, fun cacertfile/1}
, {keyfile, fun keyfile/1}
, {certfile, fun certfile/1}
, {verify, fun verify/1}
].
server(type) -> ip_port();
server(validator) -> [?REQUIRED("the field 'server' is required")];
server(_) -> undefined.
database(type) -> string();
database(validator) -> [?REQUIRED("the field 'server' is required")];
database(_) -> undefined.
pool_size(type) -> integer();
pool_size(default) -> 8;
pool_size(validator) -> [?MIN(1), ?MAX(64)];
pool_size(_) -> undefined.
user(type) -> string();
user(default) -> "root";
user(_) -> undefined.
password(type) -> string();
password(default) -> "";
password(_) -> undefined.
auto_reconnect(type) -> boolean();
auto_reconnect(default) -> true;
auto_reconnect(_) -> undefined.
ssl(type) -> boolean();
ssl(default) -> false;
ssl(_) -> undefined.
cacertfile(type) -> string();
cacertfile(default) -> "";
cacertfile(_) -> undefined.
keyfile(type) -> string();
keyfile(default) -> "";
keyfile(_) -> undefined.
certfile(type) -> string();
certfile(default) -> "";
certfile(_) -> undefined.
verify(type) -> boolean();
verify(default) -> false;
verify(_) -> undefined.
to_ip_port(Str) ->
case string:tokens(Str, ":") of
[Ip, Port] ->
case inet:parse_address(Ip) of
{ok, R} -> {ok, {R, list_to_integer(Port)}};
_ -> {error, Str}
end;
_ -> {error, Str}
end.
ip_port_to_string({Ip, Port}) ->
iolist_to_binary([inet:ntoa(Ip), ":", integer_to_list(Port)]).

View File

@ -0,0 +1,36 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
SupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.
%% internal functions

19
apps/emqx_data_bridge/.gitignore vendored Normal file
View File

@ -0,0 +1,19 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~

View File

@ -0,0 +1,10 @@
# emqx_data_bridge
EMQ X Data Bridge is an application that managing the resources (see emqx_resource) used by emqx
rule engine.
It provides CRUD HTTP APIs of the resources, and is also responsible for loading the resources at
startup, and saving configs of resources to `data/` after configs updated.
The application depends on `emqx_connector` as that's where all the callback modules of `connector`
resources placed.

View File

@ -0,0 +1,30 @@
##--------------------------------------------------------------------
## EMQ X Bridge Plugin
##--------------------------------------------------------------------
emqx_data_bridge.bridges: [
{name: "mysql-abc"
type: mysql
config: {
server: "127.0.0.1:3306"
database: mqtt
pool_size: 1
user: root
password: public
auto_reconnect: true
ssl: false
}
},
{name: "mysql-def"
type: mysql
config: {
server: "127.0.0.1:3306"
database: mqtt
pool_size: 1
user: root
password: public
auto_reconnect: true
ssl: false
}
}
]

View File

@ -0,0 +1,16 @@
%%-*- mode: erlang -*-
%% emqx_data_bridge config mapping
{mapping, "emqx_data_bridge.bridges", "emqx_data_bridge.bridges", [
{default, []},
{datatype, string}
]}.
% fields("emqx_data_bridge") ->
% [
% {bridges,
% [fun(mapping) -> "emqx_data_bridge.bridges";
% (type) -> list();
% (_) -> undefined
% end]}
% ]

View File

@ -0,0 +1,7 @@
{erl_opts, [debug_info]}.
{deps, []}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_data_bridge]}
]}.

View File

@ -0,0 +1,15 @@
{application, emqx_data_bridge,
[{description, "An OTP application"},
{vsn, "0.1.0"},
{registered, []},
{mod, {emqx_data_bridge_app, []}},
{applications,
[kernel,
stdlib
]},
{env,[]},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -0,0 +1,48 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_data_bridge).
-export([ load_bridges/0
, resource_type/1
, bridge_type/1
, name_to_resource_id/1
, resource_id_to_name/1
, list_bridges/0
, is_bridge/1
]).
load_bridges() ->
Bridges = proplists:get_value(bridges,
application:get_all_env(emqx_data_bridge), []),
emqx_data_bridge_monitor:ensure_all_started(Bridges).
resource_type(<<"mysql">>) -> emqx_connector_mysql.
bridge_type(emqx_connector_mysql) -> <<"mysql">>.
name_to_resource_id(BridgeName) ->
<<"bridge:", BridgeName/binary>>.
resource_id_to_name(<<"bridge:", BridgeName/binary>> = _ResourceId) ->
BridgeName.
list_bridges() ->
emqx_resource_api:list_instances(fun emqx_data_bridge:is_bridge/1).
is_bridge(#{id := <<"bridge:", _/binary>>}) ->
true;
is_bridge(_Data) ->
false.

View File

@ -0,0 +1,114 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_data_bridge_api).
-rest_api(#{ name => list_data_bridges
, method => 'GET'
, path => "/data_bridges"
, func => list_bridges
, descr => "List all data bridges"
}).
-rest_api(#{ name => get_data_bridge
, method => 'GET'
, path => "/data_bridges/:bin:name"
, func => get_bridge
, descr => "Get a data bridge by name"
}).
-rest_api(#{ name => create_data_bridge
, method => 'POST'
, path => "/data_bridges/:bin:name"
, func => create_bridge
, descr => "Create a new data bridge"
}).
-rest_api(#{ name => update_data_bridge
, method => 'POST'
, path => "/data_bridges/:bin:name"
, func => update_bridge
, descr => "Update an existing data bridge"
}).
-rest_api(#{ name => delete_data_bridge
, method => 'DELETE'
, path => "/data_bridges/:bin:name"
, func => delete_bridge
, descr => "Delete an existing data bridge"
}).
-export([ list_bridges/2
, get_bridge/2
, create_bridge/2
, update_bridge/2
, delete_bridge/2
]).
list_bridges(_Binding, _Params) ->
{200, #{code => 0, data => [format_api_reply(Data) ||
Data <- emqx_data_bridge:list_bridges()]}}.
get_bridge(#{name := Name}, _Params) ->
case emqx_resource:get_instance(emqx_data_bridge:name_to_resource_id(Name)) of
{ok, Data} ->
{200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}};
{error, not_found} ->
{404, #{code => 102, message => <<"not_found: ", Name/binary>>}}
end.
create_bridge(#{name := Name}, Params) ->
Config = proplists:get_value(<<"config">>, Params),
BridgeType = proplists:get_value(<<"type">>, Params),
case emqx_resource:check_and_create(
emqx_data_bridge:name_to_resource_id(Name),
emqx_data_bridge:resource_type(BridgeType), Config) of
{ok, Data} ->
{200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}};
{error, already_created} ->
{400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}};
{error, Reason0} ->
Reason = emqx_resource_api:stringnify(Reason0),
{500, #{code => 102, message => <<"create bridge ", Name/binary,
" failed:", Reason/binary>>}}
end.
update_bridge(#{name := Name}, Params) ->
Config = proplists:get_value(<<"config">>, Params),
BridgeType = proplists:get_value(<<"type">>, Params),
case emqx_resource:check_and_update(
emqx_data_bridge:name_to_resource_id(Name),
emqx_data_bridge:resource_type(BridgeType), Config, []) of
{ok, Data} ->
{200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}};
{error, not_found} ->
{400, #{code => 102, message => <<"bridge not_found: ", Name/binary>>}};
{error, Reason0} ->
Reason = emqx_resource_api:stringnify(Reason0),
{500, #{code => 102, message => <<"update bridge ", Name/binary,
" failed:", Reason/binary>>}}
end.
delete_bridge(#{name := Name}, _Params) ->
case emqx_resource:remove(emqx_data_bridge:name_to_resource_id(Name)) of
ok -> {200, #{code => 0, data => #{}}};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end.
format_api_reply(#{resource_type := Type, id := Id, config := Conf, status := Status}) ->
#{type => emqx_data_bridge:bridge_type(Type),
name => emqx_data_bridge:resource_id_to_name(Id),
config => Conf, status => Status}.

View File

@ -0,0 +1,32 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_data_bridge_app).
-behaviour(application).
-emqx_plugin(?MODULE).
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_data_bridge_sup:start_link(),
ok = emqx_data_bridge:load_bridges(),
{ok, Sup}.
stop(_State) ->
ok.
%% internal functions

View File

@ -0,0 +1,69 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_data_bridge_config_handler).
-behaviour(gen_server).
%% API functions
-export([ start_link/0
, notify_updated/0
]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
notify_updated() ->
gen_server:cast(?MODULE, updated).
init([]) ->
{ok, #state{}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(updated, State) ->
Configs = [format_conf(Data) || Data <- emqx_data_bridge:list_bridges()],
emqx_config_handler ! {emqx_data_bridge, Configs},
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%============================================================================
format_conf(#{resource_type := Type, id := Id, config := Conf}) ->
#{type => Type, name => emqx_data_bridge:resource_id_to_name(Id),
config => Conf}.

View File

@ -0,0 +1,79 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% This process monitors all the data bridges, and try to restart a bridge
%% when one of it stopped.
-module(emqx_data_bridge_monitor).
-behaviour(gen_server).
%% API functions
-export([ start_link/0
, ensure_all_started/1
]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
ensure_all_started(Configs) ->
gen_server:cast(?MODULE, {start_and_monitor, Configs}).
init([]) ->
{ok, #state{}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast({start_and_monitor, Configs}, State) ->
ok = load_bridges(Configs),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%============================================================================
load_bridges(Configs) ->
lists:foreach(fun load_bridge/1, Configs).
load_bridge(#{<<"name">> := Name, <<"type">> := Type,
<<"config">> := Config}) ->
case emqx_resource:check_and_create_local(
emqx_data_bridge:name_to_resource_id(Name),
emqx_data_bridge:resource_type(Type), Config) of
{ok, _} -> ok;
{error, already_created} -> ok;
{error, Reason} ->
error({load_bridge, Reason})
end.

View File

@ -0,0 +1,41 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_data_bridge_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
SupFlags = #{strategy => one_for_one,
intensity => 10,
period => 10},
ChildSpecs = [
#{id => emqx_data_bridge_monitor,
start => {emqx_data_bridge_monitor, start_link, []},
restart => permanent,
type => worker,
modules => [emqx_data_bridge_monitor]}],
{ok, {SupFlags, ChildSpecs}}.
%% internal functions

View File

@ -0,0 +1,58 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_plugin_libs_pool).
-export([ start_pool/3
, stop_pool/1
, pool_name/1
, health_check/3
]).
pool_name(ID) when is_binary(ID) ->
list_to_atom(binary_to_list(ID)).
start_pool(Name, Mod, Options) ->
case ecpool:start_sup_pool(Name, Mod, Options) of
{ok, _} -> logger:log(info, "Initiated ~0p Successfully", [Name]);
{error, {already_started, _Pid}} ->
stop_pool(Name),
start_pool(Name, Mod, Options);
{error, Reason} ->
logger:log(error, "Initiate ~0p failed ~0p", [Name, Reason]),
error({start_pool_failed, Name})
end.
stop_pool(Name) ->
case ecpool:stop_sup_pool(Name) of
ok -> logger:log(info, "Destroyed ~0p Successfully", [Name]);
{error, not_found} -> ok;
{error, Reason} ->
logger:log(error, "Destroy ~0p failed, ~0p", [Name, Reason]),
error({stop_pool_failed, Name})
end.
health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) ->
Status = [begin
case ecpool_worker:client(Worker) of
{ok, Conn} -> CheckFunc(Conn);
_ -> false
end
end || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of
true -> {ok, State};
false -> {error, test_query_failed}
end.

View File

@ -14,10 +14,10 @@
]). ]).
%% callbacks for emqx_resource config schema %% callbacks for emqx_resource config schema
-export([fields/1]). -export([schema/0]).
fields(ConfPath) -> schema() ->
log_tracer_schema:fields(ConfPath). log_tracer_schema:schema().
on_start(InstId, Config) -> on_start(InstId, Config) ->
io:format("== the demo log tracer ~p started.~nconfig: ~p~n", [InstId, Config]), io:format("== the demo log tracer ~p started.~nconfig: ~p~n", [InstId, Config]),

View File

@ -2,7 +2,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-export([fields/1]). -export([schema/0]).
-reflect_type([t_level/0, t_cache_logs_in/0]). -reflect_type([t_level/0, t_cache_logs_in/0]).
@ -10,15 +10,14 @@
-type t_cache_logs_in() :: memory | file. -type t_cache_logs_in() :: memory | file.
fields("config") -> schema() ->
[ {condition, fun condition/1} [ {condition, fun condition/1}
, {level, fun level/1} , {level, fun level/1}
, {enable_cache, fun enable_cache/1} , {enable_cache, fun enable_cache/1}
, {cache_logs_in, fun cache_logs_in/1} , {cache_logs_in, fun cache_logs_in/1}
, {cache_log_dir, fun cache_log_dir/1} , {cache_log_dir, fun cache_log_dir/1}
, {bulk, fun bulk/1} , {bulk, fun bulk/1}
]; ].
fields(_) -> [].
condition(mapping) -> "config.condition"; condition(mapping) -> "config.condition";
condition(type) -> map(); condition(type) -> map();

View File

@ -37,7 +37,11 @@
%% APIs for instances %% APIs for instances
-export([ parse_config/2 -export([ check_config/2
, check_and_create/3
, check_and_create_local/3
, check_and_update/4
, check_and_update_local/4
, resource_type_from_str/1 , resource_type_from_str/1
]). ]).
@ -45,10 +49,13 @@
%% provisional solution: rpc:multical to all the nodes for creating/updating/removing %% provisional solution: rpc:multical to all the nodes for creating/updating/removing
%% todo: replicate operations %% todo: replicate operations
-export([ create/3 %% store the config and start the instance -export([ create/3 %% store the config and start the instance
, create_local/3
, create_dry_run/3 %% run start/2, health_check/2 and stop/1 sequentially , create_dry_run/3 %% run start/2, health_check/2 and stop/1 sequentially
, create_dry_run_local/3
, update/4 %% update the config, stop the old instance and start the new one , update/4 %% update the config, stop the old instance and start the new one
%% it will create a new resource when the id does not exist , update_local/4
, remove/1 %% remove the config and stop the instance , remove/1 %% remove the config and stop the instance
, remove_local/1
]). ]).
%% Calls to the callback module with current resource state %% Calls to the callback module with current resource state
@ -66,16 +73,12 @@
, call_stop/3 %% stop the instance , call_stop/3 %% stop the instance
, call_config_merge/4 %% merge the config when updating , call_config_merge/4 %% merge the config when updating
, call_jsonify/2 , call_jsonify/2
, call_api_reply_format/2
]). ]).
-export([ list_instances/0 %% list all the instances, id only. -export([ list_instances/0 %% list all the instances, id only.
, list_instances_verbose/0 %% list all the instances , list_instances_verbose/0 %% list all the instances
, get_instance/1 %% return the data of the instance , get_instance/1 %% return the data of the instance
, get_instance_by_type/1 %% return all the instances of the same resource type , list_instances_by_type/1 %% return all the instances of the same resource type
, load_instances_from_dir/1 %% load instances from a directory
, load_instance_from_file/1 %% load an instance from a config file
, load_instance_from_config/1 %% load an instance from a map or json-string config
% , dependents/1 % , dependents/1
% , inc_counter/2 %% increment the counter of the instance % , inc_counter/2 %% increment the counter of the instance
% , inc_counter/3 %% increment the counter by a given integer % , inc_counter/3 %% increment the counter by a given integer
@ -154,22 +157,42 @@ query_failed({_, {OnFailed, Args}}) ->
-spec create(instance_id(), resource_type(), resource_config()) -> -spec create(instance_id(), resource_type(), resource_config()) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
create(InstId, ResourceType, Config) -> create(InstId, ResourceType, Config) ->
?CLUSTER_CALL(call_instance, [InstId, {create, InstId, ResourceType, Config}], {ok, _}). ?CLUSTER_CALL(create_local, [InstId, ResourceType, Config], {ok, _}).
-spec create_local(instance_id(), resource_type(), resource_config()) ->
{ok, resource_data()} | {error, Reason :: term()}.
create_local(InstId, ResourceType, Config) ->
call_instance(InstId, {create, InstId, ResourceType, Config}).
-spec create_dry_run(instance_id(), resource_type(), resource_config()) -> -spec create_dry_run(instance_id(), resource_type(), resource_config()) ->
ok | {error, Reason :: term()}. ok | {error, Reason :: term()}.
create_dry_run(InstId, ResourceType, Config) -> create_dry_run(InstId, ResourceType, Config) ->
?CLUSTER_CALL(call_instance, [InstId, {create_dry_run, InstId, ResourceType, Config}]). ?CLUSTER_CALL(create_dry_run_local, [InstId, ResourceType, Config]).
-spec create_dry_run_local(instance_id(), resource_type(), resource_config()) ->
ok | {error, Reason :: term()}.
create_dry_run_local(InstId, ResourceType, Config) ->
call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}).
-spec update(instance_id(), resource_type(), resource_config(), term()) -> -spec update(instance_id(), resource_type(), resource_config(), term()) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
update(InstId, ResourceType, Config, Params) -> update(InstId, ResourceType, Config, Params) ->
?CLUSTER_CALL(call_instance, [InstId, {update, InstId, ResourceType, Config, Params}], {ok, _}). ?CLUSTER_CALL(update_local, [InstId, ResourceType, Config, Params], {ok, _}).
-spec update_local(instance_id(), resource_type(), resource_config(), term()) ->
{ok, resource_data()} | {error, Reason :: term()}.
update_local(InstId, ResourceType, Config, Params) ->
call_instance(InstId, {update, InstId, ResourceType, Config, Params}).
-spec remove(instance_id()) -> ok | {error, Reason :: term()}. -spec remove(instance_id()) -> ok | {error, Reason :: term()}.
remove(InstId) -> remove(InstId) ->
?CLUSTER_CALL(call_instance, [InstId, {remove, InstId}]). ?CLUSTER_CALL(remove_local, [InstId]).
-spec remove_local(instance_id()) -> ok | {error, Reason :: term()}.
remove_local(InstId) ->
call_instance(InstId, {remove, InstId}).
%% =================================================================================
-spec query(instance_id(), Request :: term()) -> Result :: term(). -spec query(instance_id(), Request :: term()) -> Result :: term().
query(InstId, Request) -> query(InstId, Request) ->
query(InstId, Request, undefined). query(InstId, Request, undefined).
@ -211,22 +234,10 @@ list_instances() ->
list_instances_verbose() -> list_instances_verbose() ->
emqx_resource_instance:list_all(). emqx_resource_instance:list_all().
-spec get_instance_by_type(module()) -> [resource_data()]. -spec list_instances_by_type(module()) -> [resource_data()].
get_instance_by_type(ResourceType) -> list_instances_by_type(ResourceType) ->
emqx_resource_instance:lookup_by_type(ResourceType). emqx_resource_instance:lookup_by_type(ResourceType).
-spec load_instances_from_dir(Dir :: string()) -> ok.
load_instances_from_dir(Dir) ->
emqx_resource_instance:load_dir(Dir).
-spec load_instance_from_file(File :: string()) -> ok.
load_instance_from_file(File) ->
emqx_resource_instance:load_file(File).
-spec load_instance_from_config(binary() | map()) -> {ok, resource_data()} | {error, term()}.
load_instance_from_config(Config) ->
emqx_resource_instance:load_config(Config).
-spec call_start(instance_id(), module(), resource_config()) -> -spec call_start(instance_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}. {ok, resource_state()} | {error, Reason :: term()}.
call_start(InstId, Mod, Config) -> call_start(InstId, Mod, Config) ->
@ -260,31 +271,52 @@ call_jsonify(Mod, Config) ->
true -> ?SAFE_CALL(Mod:on_jsonify(Config)) true -> ?SAFE_CALL(Mod:on_jsonify(Config))
end. end.
-spec call_api_reply_format(module(), resource_data()) -> jsx:json_term(). -spec check_config(resource_type(), binary() | term()) ->
call_api_reply_format(Mod, Data) ->
case erlang:function_exported(Mod, on_api_reply_format, 1) of
false -> emqx_resource_api:default_api_reply_format(Data);
true -> ?SAFE_CALL(Mod:on_api_reply_format(Data))
end.
-spec parse_config(resource_type(), binary() | term()) ->
{ok, resource_config()} | {error, term()}. {ok, resource_config()} | {error, term()}.
parse_config(ResourceType, RawConfig) when is_binary(RawConfig) -> check_config(ResourceType, RawConfig) when is_binary(RawConfig) ->
case hocon:binary(RawConfig, #{format => richmap}) of case hocon:binary(RawConfig, #{format => richmap}) of
{ok, MapConfig} -> {ok, MapConfig} ->
do_parse_config(ResourceType, MapConfig); do_check_config(ResourceType, MapConfig);
Error -> Error Error -> Error
end; end;
parse_config(ResourceType, RawConfigTerm) -> check_config(ResourceType, RawConfigTerm) ->
parse_config(ResourceType, jsx:encode(#{config => RawConfigTerm})). check_config(ResourceType, jsx:encode(#{config => RawConfigTerm})).
-spec do_parse_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}. -spec do_check_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}.
do_parse_config(ResourceType, MapConfig) -> do_check_config(ResourceType, MapConfig) ->
case ?SAFE_CALL(hocon_schema:generate(ResourceType, MapConfig)) of case ?SAFE_CALL(hocon_schema:check(ResourceType, MapConfig)) of
{error, Reason} -> {error, Reason}; {error, Reason} -> {error, Reason};
Config -> Config -> {ok, maps:get(<<"config">>, hocon_schema:richmap_to_map(Config))}
InstConf = maps:from_list(proplists:get_value(config, Config)), end.
{ok, InstConf}
-spec check_and_create(instance_id(), resource_type(), binary() | term()) ->
{ok, resource_data()} | {error, term()}.
check_and_create(InstId, ResourceType, Config) ->
check_and_do(ResourceType, Config,
fun(InstConf) -> create(InstId, ResourceType, InstConf) end).
-spec check_and_create_local(instance_id(), resource_type(), binary() | term()) ->
{ok, resource_data()} | {error, term()}.
check_and_create_local(InstId, ResourceType, Config) ->
check_and_do(ResourceType, Config,
fun(InstConf) -> create_local(InstId, ResourceType, InstConf) end).
-spec check_and_update(instance_id(), resource_type(), binary() | term(), term()) ->
{ok, resource_data()} | {error, term()}.
check_and_update(InstId, ResourceType, Config, Params) ->
check_and_do(ResourceType, Config,
fun(InstConf) -> update(InstId, ResourceType, InstConf, Params) end).
-spec check_and_update_local(instance_id(), resource_type(), binary() | term(), term()) ->
{ok, resource_data()} | {error, term()}.
check_and_update_local(InstId, ResourceType, Config, Params) ->
check_and_do(ResourceType, Config,
fun(InstConf) -> update_local(InstId, ResourceType, InstConf, Params) end).
check_and_do(ResourceType, Config, Do) when is_function(Do) ->
case check_config(ResourceType, Config) of
{ok, InstConf} -> Do(InstConf);
Error -> Error
end. end.
%% ================================================================================= %% =================================================================================

View File

@ -15,61 +15,16 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_resource_api). -module(emqx_resource_api).
-export([ get_all/3 -export([ list_instances/1
, get/3 , format_data/1
, put/3 , stringnify/1
, delete/3
]). ]).
-export([default_api_reply_format/1]). list_instances(Filter) ->
[format_data(Data) || Data <- emqx_resource:list_instances_verbose(), Filter(Data)].
get_all(Mod, _Binding, _Params) -> format_data(#{id := Id, mod := Mod, status := Status, config := Config}) ->
{200, #{code => 0, data => #{id => Id, status => Status, resource_type => Mod,
[format_data(Mod, Data) || Data <- emqx_resource:list_instances_verbose()]}}.
get(Mod, #{id := Id}, _Params) ->
case emqx_resource:get_instance(stringnify(Id)) of
{ok, Data} ->
{200, #{code => 0, data => format_data(Mod, Data)}};
{error, not_found} ->
{404, #{code => 102, message => {resource_instance_not_found, stringnify(Id)}}}
end.
put(Mod, #{id := Id}, Params) ->
ConfigParams = proplists:get_value(<<"config">>, Params),
ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params, #{}),
case emqx_resource:resource_type_from_str(ResourceTypeStr) of
{ok, ResourceType} ->
do_put(Mod, stringnify(Id), ConfigParams, ResourceType, Params);
{error, Reason} ->
{404, #{code => 102, message => stringnify(Reason)}}
end.
do_put(Mod, Id, ConfigParams, ResourceType, Params) ->
case emqx_resource:parse_config(ResourceType, ConfigParams) of
{ok, Config} ->
case emqx_resource:update(Id, ResourceType, Config, Params) of
{ok, Data} ->
{200, #{code => 0, data => format_data(Mod, Data)}};
{error, Reason} ->
{500, #{code => 102, message => stringnify(Reason)}}
end;
{error, Reason} ->
{400, #{code => 108, message => stringnify(Reason)}}
end.
delete(_Mod, #{id := Id}, _Params) ->
case emqx_resource:remove(stringnify(Id)) of
ok -> {200, #{code => 0, data => #{}}};
{error, Reason} ->
{500, #{code => 102, message => stringnify(Reason)}}
end.
format_data(Mod, Data) ->
emqx_resource:call_api_reply_format(Mod, Data).
default_api_reply_format(#{id := Id, mod := Mod, status := Status, config := Config}) ->
#{node => node(), id => Id, status => Status, resource_type => Mod,
config => emqx_resource:call_jsonify(Mod, Config)}. config => emqx_resource:call_jsonify(Mod, Config)}.
stringnify(Bin) when is_binary(Bin) -> Bin; stringnify(Bin) when is_binary(Bin) -> Bin;

View File

@ -23,10 +23,7 @@
-export([start_link/2]). -export([start_link/2]).
%% load resource instances from *.conf files %% load resource instances from *.conf files
-export([ load_dir/1 -export([ lookup/1
, load_file/1
, load_config/1
, lookup/1
, list_all/0 , list_all/0
, lookup_by_type/1 , lookup_by_type/1
, create_local/3 , create_local/3
@ -85,44 +82,8 @@ lookup_by_type(ResourceType) ->
[Data || #{mod := Mod} = Data <- list_all() [Data || #{mod := Mod} = Data <- list_all()
, Mod =:= ResourceType]. , Mod =:= ResourceType].
-spec load_dir(Dir :: string()) -> ok. -spec create_local(instance_id(), resource_type(), resource_config()) ->
load_dir(Dir) -> {ok, resource_data()} | {error, term()}.
lists:foreach(fun load_file/1, filelib:wildcard(filename:join([Dir, "*.conf"]))).
load_file(File) ->
case ?SAFE_CALL(hocon_token:read(File)) of
{error, Reason} ->
logger:error("load resource from ~p failed: ~p", [File, Reason]);
RawConfig ->
case load_config(RawConfig) of
{ok, Data} ->
logger:debug("loaded resource instance from file: ~p, data: ~p",
[File, Data]);
{error, Reason} ->
logger:error("load resource from ~p failed: ~p", [File, Reason])
end
end.
-spec load_config(binary() | map()) -> {ok, resource_data()} | {error, term()}.
load_config(RawConfig) when is_binary(RawConfig) ->
case hocon:binary(RawConfig, #{format => map}) of
{ok, ConfigTerm} -> load_config(ConfigTerm);
Error -> Error
end;
load_config(#{<<"id">> := Id, <<"resource_type">> := ResourceTypeStr} = Config) ->
MapConfig = maps:get(<<"config">>, Config, #{}),
case emqx_resource:resource_type_from_str(ResourceTypeStr) of
{ok, ResourceType} -> parse_and_load_config(Id, ResourceType, MapConfig);
Error -> Error
end.
parse_and_load_config(InstId, ResourceType, MapConfig) ->
case emqx_resource:parse_config(ResourceType, MapConfig) of
{ok, InstConf} -> create_local(InstId, ResourceType, InstConf);
Error -> Error
end.
create_local(InstId, ResourceType, InstConf) -> create_local(InstId, ResourceType, InstConf) ->
case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of
{ok, Data} -> {ok, Data}; {ok, Data} -> {ok, Data};
@ -206,7 +167,7 @@ do_update(InstId, ResourceType, NewConfig, Params) ->
{ok, #{mod := Mod}} when Mod =/= ResourceType -> {ok, #{mod := Mod}} when Mod =/= ResourceType ->
{error, updating_to_incorrect_resource_type}; {error, updating_to_incorrect_resource_type};
{error, not_found} -> {error, not_found} ->
do_create(InstId, ResourceType, NewConfig) {error, not_found}
end. end.
do_create(InstId, ResourceType, Config) -> do_create(InstId, ResourceType, Config) ->

View File

@ -22,7 +22,7 @@
parse_transform(Forms, _Opts) -> parse_transform(Forms, _Opts) ->
Mod = hd([M || {attribute, _, module, M} <- Forms]), Mod = hd([M || {attribute, _, module, M} <- Forms]),
AST = trans(Mod, proplists:delete(eof, Forms)), AST = trans(Mod, proplists:delete(eof, Forms)),
debug_print(Mod, AST), _ = debug_print(Mod, AST),
AST. AST.
-ifdef(RESOURCE_DEBUG). -ifdef(RESOURCE_DEBUG).
@ -47,68 +47,34 @@ trans(Mod, Forms) ->
forms(Mod, [F0 | Fs0]) -> forms(Mod, [F0 | Fs0]) ->
case form(Mod, F0) of case form(Mod, F0) of
{CurrForm, AppendedForms} -> {CurrForms, AppendedForms} ->
CurrForm ++ forms(Mod, Fs0) ++ AppendedForms; CurrForms ++ forms(Mod, Fs0) ++ AppendedForms;
{AHeadForms, CurrForm, AppendedForms} -> {CurrForms, FollowerForms, AppendedForms} ->
AHeadForms ++ CurrForm ++ forms(Mod, Fs0) ++ AppendedForms CurrForms ++ FollowerForms ++ forms(Mod, Fs0) ++ AppendedForms
end; end;
forms(_, []) -> []. forms(_, []) -> [].
form(Mod, Form) -> form(Mod, Form) ->
case Form of case Form of
?Q("-emqx_resource_api_path('@Path').") -> ?Q("-module('@_').") ->
{fix_spec_attrs() ++ fix_api_attrs(Mod, erl_syntax:concrete(Path)) {[Form], fix_spec_attrs(), fix_spec_funcs(Mod)};
++ fix_api_exports(),
[],
fix_spec_funcs(Mod) ++ fix_api_funcs(Mod)};
_ -> _ ->
%io:format("---other form: ~p~n", [Form]), %io:format("---other form: ~p~n", [Form]),
{[], [Form], []} {[Form], [], []}
end. end.
fix_spec_attrs() -> fix_spec_attrs() ->
[ ?Q("-export([emqx_resource_schema/0]).") [ ?Q("-export([emqx_resource_schema/0]).")
, ?Q("-export([structs/0]).") , ?Q("-export([structs/0, fields/1]).")
, ?Q("-behaviour(hocon_schema).") , ?Q("-behaviour(hocon_schema).")
]. ].
fix_spec_funcs(_Mod) -> fix_spec_funcs(_Mod) ->
[ (?Q("emqx_resource_schema() -> <<\"demo_swagger_schema\">>.")) [ ?Q("emqx_resource_schema() -> <<\"demo_swagger_schema\">>.")
, ?Q("structs() -> [\"config\"].") , ?Q("structs() -> [\"config\"].")
, ?Q("fields(\"config\") -> "
"[fun (type) -> \"schema\"; "
" (_) -> undefined "
" end];"
"fields(\"schema\") -> schema()."
)
]. ].
fix_api_attrs(Mod, Path) ->
BaseName = atom_to_list(Mod),
[erl_syntax:revert(
erl_syntax:attribute(?Q("rest_api"), [
erl_syntax:abstract(#{
name => list_to_atom(Act ++ "_" ++ BaseName),
method => Method,
path => mk_path(Path, WithId),
func => Func,
descr => Act ++ " the " ++ BaseName})]))
|| {Act, Method, WithId, Func} <- [
{"list", 'GET', noid, api_get_all},
{"get", 'GET', id, api_get},
{"update", 'PUT', id, api_put},
{"delete", 'DELETE', id, api_delete}]].
fix_api_exports() ->
[?Q("-export([api_get_all/2, api_get/2, api_put/2, api_delete/2]).")].
fix_api_funcs(Mod) ->
[erl_syntax:revert(?Q(
"api_get_all(Binding, Params) ->
emqx_resource_api:get_all('@Mod@', Binding, Params).")),
erl_syntax:revert(?Q(
"api_get(Binding, Params) ->
emqx_resource_api:get('@Mod@', Binding, Params).")),
erl_syntax:revert(?Q(
"api_put(Binding, Params) ->
emqx_resource_api:put('@Mod@', Binding, Params).")),
erl_syntax:revert(?Q(
"api_delete(Binding, Params) ->
emqx_resource_api:delete('@Mod@', Binding, Params)."))
].
mk_path(Path, id) -> string:trim(Path, trailing, "/") ++ "/:bin:id";
mk_path(Path, noid) -> Path.

View File

@ -6,4 +6,6 @@
{emqx_telemetry, {{enable_plugin_emqx_telemetry}}}. {emqx_telemetry, {{enable_plugin_emqx_telemetry}}}.
{emqx_rule_engine, {{enable_plugin_emqx_rule_engine}}}. {emqx_rule_engine, {{enable_plugin_emqx_rule_engine}}}.
{emqx_resource, {{enable_plugin_emqx_resource}}}. {emqx_resource, {{enable_plugin_emqx_resource}}}.
{emqx_connector, {{enable_plugin_emqx_connector}}}.
{emqx_data_bridge, {{enable_plugin_emqx_data_bridge}}}.
{emqx_bridge_mqtt, {{enable_plugin_emqx_bridge_mqtt}}}. {emqx_bridge_mqtt, {{enable_plugin_emqx_bridge_mqtt}}}.

View File

@ -189,6 +189,8 @@ overlay_vars_rel(RelType) ->
[ {enable_plugin_emqx_rule_engine, RelType =:= cloud} [ {enable_plugin_emqx_rule_engine, RelType =:= cloud}
, {enable_plugin_emqx_bridge_mqtt, RelType =:= edge} , {enable_plugin_emqx_bridge_mqtt, RelType =:= edge}
, {enable_plugin_emqx_resource, true} , {enable_plugin_emqx_resource, true}
, {enable_plugin_emqx_connector, true}
, {enable_plugin_emqx_data_bridge, true}
, {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce , {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce
, {enable_plugin_emqx_recon, true} , {enable_plugin_emqx_recon, true}
, {enable_plugin_emqx_retainer, true} , {enable_plugin_emqx_retainer, true}
@ -284,6 +286,8 @@ relx_plugin_apps(ReleaseType) ->
, emqx_web_hook , emqx_web_hook
, emqx_recon , emqx_recon
, emqx_resource , emqx_resource
, emqx_connector
, emqx_data_bridge
, emqx_rule_engine , emqx_rule_engine
, emqx_sasl , emqx_sasl
] ]