change resource,connectors,data_bridges as normal apps (#5034)

This commit is contained in:
Shawn 2021-06-19 16:27:21 +08:00 committed by zhanghongtong
parent f004e36e28
commit e2d96e46a0
24 changed files with 103 additions and 142 deletions

View File

@ -16,7 +16,7 @@
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.1"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.6.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.7.0"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}

View File

@ -1,4 +0,0 @@
##--------------------------------------------------------------------
## EMQ X CONNECTOR Plugin
##--------------------------------------------------------------------

View File

@ -1,2 +0,0 @@
%%-*- mode: erlang -*-
%% emqx_connector config mapping

View File

@ -18,8 +18,6 @@
-behaviour(application).
-emqx_plugin(?MODULE).
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->

View File

@ -19,8 +19,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-export([ schema/0
]).
-export([structs/0, fields/1]).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
@ -36,7 +35,9 @@
-export([search/4]).
%%=====================================================================
schema() ->
structs() -> [""].
fields("") ->
redis_fields() ++
emqx_connector_schema_lib:ssl_fields().

View File

@ -19,9 +19,6 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-export([ schema/0
]).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
, on_stop/2
@ -32,9 +29,13 @@
-export([connect/1]).
-export([structs/0, fields/1]).
-export([mongo_query/5]).
%%=====================================================================
schema() ->
structs() -> [""].
fields("") ->
mongodb_fields() ++
mongodb_topology_fields() ++
mongodb_rs_set_name_fields() ++

View File

@ -28,15 +28,20 @@
-export([connect/1]).
-export([schema/0]).
-export([structs/0, fields/1]).
-export([do_health_check/1]).
%%=====================================================================
schema() ->
%% Hocon schema
structs() -> [""].
fields("") ->
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().
%%=====================================================================
on_jsonify(#{server := Server}= Config) ->
Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}.

View File

@ -18,8 +18,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-export([ schema/0
]).
-export([structs/0, fields/1]).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
@ -36,7 +35,9 @@
-export([do_health_check/1]).
%%=====================================================================
schema() ->
structs() -> [""].
fields("") ->
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().

View File

@ -19,8 +19,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-export([ schema/0
]).
-export([structs/0, fields/1]).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
@ -37,7 +36,9 @@
-export([cmd/3]).
%%=====================================================================
schema() ->
structs() -> [""].
fields("") ->
redis_fields() ++
redis_sentinel_fields() ++
emqx_connector_schema_lib:ssl_fields().

View File

@ -3,7 +3,7 @@
##--------------------------------------------------------------------
emqx_data_bridge.bridges: [
# {name: "mysql"
# {name: "mysql_bridge_1"
# type: mysql
# config: {
# server: "192.168.0.172:3306"
@ -15,7 +15,7 @@ emqx_data_bridge.bridges: [
# ssl: false
# }
# }
# , {name: "pgsql"
# , {name: "pgsql_bridge_1"
# type: pgsql
# config: {
# server: "192.168.0.172:5432"
@ -27,7 +27,7 @@ emqx_data_bridge.bridges: [
# ssl: false
# }
# }
# , {name: "mongodb_single"
# , {name: "mongodb_bridge_single"
# type: mongo
# config: {
# servers: "192.168.0.172:27017"
@ -40,7 +40,7 @@ emqx_data_bridge.bridges: [
# ssl: false
# }
# }
# ,{name: "mongodb_rs"
# ,{name: "mongodb_bridge_rs"
# type: mongo
# config: {
# servers: "127.0.0.1:27017"
@ -54,7 +54,7 @@ emqx_data_bridge.bridges: [
# ssl: false
# }
# }
# ,{name: "mongodb_shared"
# ,{name: "mongodb_bridge_shared"
# type: mongo
# config: {
# servers: "127.0.0.1:27017"
@ -77,7 +77,7 @@ emqx_data_bridge.bridges: [
# min_heartbeat_frequency_ms: 10s
# }
# }
# , {name: "redis_single"
# , {name: "redis_bridge_single"
# type: redis
# config: {
# servers: "192.168.0.172:6379"
@ -89,7 +89,7 @@ emqx_data_bridge.bridges: [
# ssl: false
# }
# }
# ,{name: "redis_sentinel"
# ,{name: "redis_bridge_sentinel"
# type: redis
# config: {
# servers: "127.0.0.1:6379, 127.0.0.2:6379, 127.0.0.3:6379"
@ -100,7 +100,7 @@ emqx_data_bridge.bridges: [
# ssl: false
# }
# }
# ,{name: "redis_cluster"
# ,{name: "redis_bridge_cluster"
# type: redis
# config: {
# servers: "127.0.0.1:6379, 127.0.0.2:6379, 127.0.0.3:6379"
@ -111,7 +111,7 @@ emqx_data_bridge.bridges: [
# ssl: false
# }
# }
# , {name: "ldap"
# , {name: "ldap_bridge_1"
# type: ldap
# config: {
# servers: "192.168.0.172"

View File

@ -1,16 +0,0 @@
%%-*- mode: erlang -*-
%% emqx_data_bridge config mapping
{mapping, "emqx_data_bridge.bridges", "emqx_data_bridge.bridges", [
{default, []},
{datatype, string}
]}.
% fields("emqx_data_bridge") ->
% [
% {bridges,
% [fun(mapping) -> "emqx_data_bridge.bridges";
% (type) -> list();
% (_) -> undefined
% end]}
% ]

View File

@ -25,23 +25,24 @@
]).
load_bridges() ->
Bridges = proplists:get_value(bridges,
application:get_all_env(emqx_data_bridge), []),
ConfFile = filename:join([emqx:get_env(plugins_etc_dir), ?MODULE]) ++ ".conf",
{ok, RawConfig} = hocon:load(ConfFile, #{format => richmap}),
#{emqx_data_bridge := #{bridges := Bridges}} =
hocon_schema:check(emqx_data_bridge_schema, RawConfig,
#{atom_key => true, return_plain => true}),
emqx_data_bridge_monitor:ensure_all_started(Bridges).
resource_type(<<"mysql">>) -> emqx_connector_mysql;
resource_type(<<"pgsql">>) -> emqx_connector_pgsql;
resource_type(<<"mongo">>) -> emqx_connector_mongo;
resource_type(<<"redis">>) -> emqx_connector_redis;
resource_type(<<"ldap">>) -> emqx_connector_ldap.
bridge_type(emqx_connector_mysql) -> <<"mysql">>;
bridge_type(emqx_connector_pgsql) -> <<"pgsql">>;
bridge_type(emqx_connector_mongo) -> <<"mongo">>;
bridge_type(emqx_connector_redis) -> <<"redis">>;
bridge_type(emqx_connector_ldap) -> <<"ldap">>.
resource_type(mysql) -> emqx_connector_mysql;
resource_type(pgsql) -> emqx_connector_pgsql;
resource_type(mongo) -> emqx_connector_mongo;
resource_type(redis) -> emqx_connector_redis;
resource_type(ldap) -> emqx_connector_ldap.
bridge_type(emqx_connector_mysql) -> mysql;
bridge_type(emqx_connector_pgsql) -> pgsql;
bridge_type(emqx_connector_mongo) -> mongo;
bridge_type(emqx_connector_redis) -> redis;
bridge_type(emqx_connector_ldap) -> ldap.
name_to_resource_id(BridgeName) ->
<<"bridge:", BridgeName/binary>>.

View File

@ -17,8 +17,6 @@
-behaviour(application).
-emqx_plugin(?MODULE).
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->

View File

@ -69,9 +69,8 @@ load_bridges(Configs) ->
%% TODO: move this monitor into emqx_resource
%% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}).
load_bridge(#{<<"name">> := Name, <<"type">> := Type,
<<"config">> := Config}) ->
case emqx_resource:check_and_create_local(
load_bridge(#{name := Name, type := Type, config := Config}) ->
case emqx_resource:create_local(
emqx_data_bridge:name_to_resource_id(Name),
emqx_data_bridge:resource_type(Type), Config) of
{ok, _} -> ok;

View File

@ -0,0 +1,23 @@
-module(emqx_data_bridge_schema).
-export([structs/0, fields/1]).
-define(BRIDGE_FIELDS(T),
[{name, hoconsc:t(typerefl:binary())},
{type, hoconsc:t(typerefl:atom(T))},
{config, hoconsc:t(hoconsc:ref(list_to_atom("emqx_connector_"++atom_to_list(T)), ""))}]).
-define(TYPES, [mysql, pgsql, mongo, redis, ldap]).
-define(BRIDGES, [hoconsc:ref(T) || T <- ?TYPES]).
structs() -> [emqx_data_bridge].
fields(emqx_data_bridge) ->
[{bridges, #{type => hoconsc:array(hoconsc:union(?BRIDGES)),
default => []}}];
fields(mysql) -> ?BRIDGE_FIELDS(mysql);
fields(pgsql) -> ?BRIDGE_FIELDS(pgsql);
fields(mongo) -> ?BRIDGE_FIELDS(mongo);
fields(redis) -> ?BRIDGE_FIELDS(redis);
fields(ldap) -> ?BRIDGE_FIELDS(ldap).

View File

@ -1,3 +0,0 @@
##--------------------------------------------------------------------
## EMQ X Resource Plugin
##--------------------------------------------------------------------

View File

@ -15,6 +15,8 @@
%%--------------------------------------------------------------------
-type resource_type() :: module().
-type instance_id() :: binary().
-type raw_resource_config() :: binary() | raw_term_resource_config().
-type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()].
-type resource_config() :: term().
-type resource_spec() :: map().
-type resource_state() :: term().

View File

@ -1,2 +0,0 @@
%%-*- mode: erlang -*-
%% emqx-resource config mapping

View File

@ -84,7 +84,7 @@
% , inc_counter/3 %% increment the counter by a given integer
]).
-define(EXT, "*.spec").
-define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => false}).
-optional_callbacks([ on_query/4
, on_health_check/2
@ -267,50 +267,49 @@ call_jsonify(Mod, Config) ->
true -> ?SAFE_CALL(Mod:on_jsonify(Config))
end.
-spec check_config(resource_type(), binary() | term()) ->
-spec check_config(resource_type(), raw_resource_config()) ->
{ok, resource_config()} | {error, term()}.
check_config(ResourceType, RawConfig) when is_binary(RawConfig) ->
case hocon:binary(RawConfig, #{format => richmap}) of
{ok, MapConfig} ->
do_check_config(ResourceType, MapConfig);
case ?SAFE_CALL(hocon_schema:check(ResourceType, MapConfig, ?HOCON_CHECK_OPTS)) of
{error, Reason} -> {error, Reason};
Config -> {ok, hocon_schema:richmap_to_map(Config)}
end;
Error -> Error
end;
check_config(ResourceType, RawConfigTerm) ->
check_config(ResourceType, jsx:encode(#{config => RawConfigTerm})).
-spec do_check_config(resource_type(), map()) -> {ok, resource_config()} | {error, term()}.
do_check_config(ResourceType, MapConfig) ->
case ?SAFE_CALL(emqx_resource_schema:check(ResourceType, MapConfig)) of
case ?SAFE_CALL(hocon_schema:check_plain(ResourceType, RawConfigTerm, ?HOCON_CHECK_OPTS)) of
{error, Reason} -> {error, Reason};
Config -> {ok, maps:get(config, hocon_schema:richmap_to_map(Config))}
Config -> {ok, Config}
end.
-spec check_and_create(instance_id(), resource_type(), binary() | term()) ->
-spec check_and_create(instance_id(), resource_type(), raw_resource_config()) ->
{ok, resource_data()} | {error, term()}.
check_and_create(InstId, ResourceType, Config) ->
check_and_do(ResourceType, Config,
check_and_create(InstId, ResourceType, RawConfig) ->
check_and_do(ResourceType, RawConfig,
fun(InstConf) -> create(InstId, ResourceType, InstConf) end).
-spec check_and_create_local(instance_id(), resource_type(), binary() | term()) ->
-spec check_and_create_local(instance_id(), resource_type(), raw_resource_config()) ->
{ok, resource_data()} | {error, term()}.
check_and_create_local(InstId, ResourceType, Config) ->
check_and_do(ResourceType, Config,
check_and_create_local(InstId, ResourceType, RawConfig) ->
check_and_do(ResourceType, RawConfig,
fun(InstConf) -> create_local(InstId, ResourceType, InstConf) end).
-spec check_and_update(instance_id(), resource_type(), binary() | term(), term()) ->
-spec check_and_update(instance_id(), resource_type(), raw_resource_config(), term()) ->
{ok, resource_data()} | {error, term()}.
check_and_update(InstId, ResourceType, Config, Params) ->
check_and_do(ResourceType, Config,
check_and_update(InstId, ResourceType, RawConfig, Params) ->
check_and_do(ResourceType, RawConfig,
fun(InstConf) -> update(InstId, ResourceType, InstConf, Params) end).
-spec check_and_update_local(instance_id(), resource_type(), binary() | term(), term()) ->
-spec check_and_update_local(instance_id(), resource_type(), raw_resource_config(), term()) ->
{ok, resource_data()} | {error, term()}.
check_and_update_local(InstId, ResourceType, Config, Params) ->
check_and_do(ResourceType, Config,
check_and_update_local(InstId, ResourceType, RawConfig, Params) ->
check_and_do(ResourceType, RawConfig,
fun(InstConf) -> update_local(InstId, ResourceType, InstConf, Params) end).
check_and_do(ResourceType, Config, Do) when is_function(Do) ->
case check_config(ResourceType, Config) of
check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
case check_config(ResourceType, RawConfig) of
{ok, InstConf} -> Do(InstConf);
Error -> Error
end.

View File

@ -20,8 +20,6 @@
-include("emqx_resource.hrl").
-emqx_plugin(?MODULE).
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->

View File

@ -1,34 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_resource_schema).
-export([check/2]).
-export([structs/0, fields/1]).
-behaviour(hocon_schema).
check(SchemaMod, Conf) ->
_ = erlang:erase(res_schema_mod),
erlang:put(res_schema_mod, SchemaMod),
hocon_schema:check(?MODULE, Conf, #{atom_key => true, nullable => false}).
structs() -> ["config"].
fields("config") ->
SchemaMod = erlang:get(res_schema_mod),
SchemaMod:schema().

View File

@ -5,7 +5,4 @@
{emqx_retainer, {{enable_plugin_emqx_retainer}}}.
{emqx_telemetry, {{enable_plugin_emqx_telemetry}}}.
{emqx_rule_engine, {{enable_plugin_emqx_rule_engine}}}.
{emqx_resource, {{enable_plugin_emqx_resource}}}.
{emqx_connector, {{enable_plugin_emqx_connector}}}.
{emqx_data_bridge, {{enable_plugin_emqx_data_bridge}}}.
{emqx_bridge_mqtt, {{enable_plugin_emqx_bridge_mqtt}}}.

View File

@ -52,7 +52,7 @@
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
, {getopt, "1.0.1"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.6.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.7.0"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.2.1"}}}
]}.

View File

@ -193,9 +193,6 @@ overlay_vars_rel(RelType) ->
end,
[ {enable_plugin_emqx_rule_engine, RelType =:= cloud}
, {enable_plugin_emqx_bridge_mqtt, RelType =:= edge}
, {enable_plugin_emqx_resource, true}
, {enable_plugin_emqx_connector, true}
, {enable_plugin_emqx_data_bridge, true}
, {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce
, {enable_plugin_emqx_recon, true}
, {enable_plugin_emqx_retainer, true}
@ -254,6 +251,9 @@ relx_apps(ReleaseType) ->
, {emqx_plugin_libs, load}
, observer_cli
, emqx_http_lib
, emqx_resource
, emqx_connector
, emqx_data_bridge
]
++ [emqx_modules || not is_enterprise()]
++ [emqx_license || is_enterprise()]
@ -291,9 +291,6 @@ relx_plugin_apps(ReleaseType) ->
, emqx_auth_mnesia
, emqx_web_hook
, emqx_recon
, emqx_resource
, emqx_connector
, emqx_data_bridge
, emqx_rule_engine
, emqx_sasl
]
@ -371,6 +368,7 @@ etc_overlay(ReleaseType) ->
extra_overlay(cloud) ->
[ {copy,"{{base_dir}}/lib/emqx_lwm2m/lwm2m_xml","etc/"}
, {copy, "{{base_dir}}/lib/emqx_psk_file/etc/psk.txt", "etc/psk.txt"}
, {copy, "{{base_dir}}/lib/emqx_data_bridge/etc/emqx_data_bridge.conf", "etc/plugins/emqx_data_bridge.conf"}
];
extra_overlay(edge) ->
[].