chore(connector): update schema file
Signed-off-by: zhanghongtong <rory-z@outlook.com>
This commit is contained in:
parent
793aa951e3
commit
c10d154dab
|
@ -64,15 +64,10 @@ create_resource(#{type := DB,
|
||||||
config := Config
|
config := Config
|
||||||
} = Rule) ->
|
} = Rule) ->
|
||||||
ResourceID = iolist_to_binary([io_lib:format("~s_~s",[?APP, DB]), "_", integer_to_list(erlang:system_time())]),
|
ResourceID = iolist_to_binary([io_lib:format("~s_~s",[?APP, DB]), "_", integer_to_list(erlang:system_time())]),
|
||||||
NConfig = case DB of
|
case emqx_resource:create(
|
||||||
redis -> #{config => Config };
|
|
||||||
mongo -> #{config => Config };
|
|
||||||
_ -> Config
|
|
||||||
end,
|
|
||||||
case emqx_resource:check_and_create(
|
|
||||||
ResourceID,
|
ResourceID,
|
||||||
list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
|
list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
|
||||||
NConfig)
|
Config)
|
||||||
of
|
of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
Rule#{resource_id => ResourceID};
|
Rule#{resource_id => ResourceID};
|
||||||
|
|
|
@ -16,30 +16,20 @@ structs() -> ["emqx_authz"].
|
||||||
fields("emqx_authz") ->
|
fields("emqx_authz") ->
|
||||||
[ {rules, rules()}
|
[ {rules, rules()}
|
||||||
];
|
];
|
||||||
fields(mongo_connector) ->
|
fields(mongo) ->
|
||||||
[ {principal, principal()}
|
connector_fields(mongo) ++
|
||||||
, {type, #{type => hoconsc:enum([mongo])}}
|
[ {collection, #{type => atom()}}
|
||||||
, {config, #{type => map()}}
|
|
||||||
, {collection, #{type => atom()}}
|
|
||||||
, {find, #{type => map()}}
|
, {find, #{type => map()}}
|
||||||
];
|
];
|
||||||
fields(redis_connector) ->
|
fields(redis) ->
|
||||||
[ {principal, principal()}
|
connector_fields(redis) ++
|
||||||
, {type, #{type => hoconsc:enum([redis])}}
|
[ {cmd, query()} ];
|
||||||
, {config, #{type => hoconsc:union(
|
fields(mysql) ->
|
||||||
[ hoconsc:ref(emqx_connector_redis, cluster)
|
connector_fields(mysql) ++
|
||||||
, hoconsc:ref(emqx_connector_redis, sentinel)
|
[ {sql, query()} ];
|
||||||
, hoconsc:ref(emqx_connector_redis, single)
|
fields(pgsql) ->
|
||||||
])}
|
connector_fields(pgsql) ++
|
||||||
}
|
[ {sql, query()} ];
|
||||||
, {cmd, query()}
|
|
||||||
];
|
|
||||||
fields(sql_connector) ->
|
|
||||||
[ {principal, principal() }
|
|
||||||
, {type, #{type => hoconsc:enum([mysql, pgsql])}}
|
|
||||||
, {config, #{type => map()}}
|
|
||||||
, {sql, query()}
|
|
||||||
];
|
|
||||||
fields(simple_rule) ->
|
fields(simple_rule) ->
|
||||||
[ {permission, #{type => permission()}}
|
[ {permission, #{type => permission()}}
|
||||||
, {action, #{type => action()}}
|
, {action, #{type => action()}}
|
||||||
|
@ -88,9 +78,10 @@ union_array(Item) when is_list(Item) ->
|
||||||
rules() ->
|
rules() ->
|
||||||
#{type => union_array(
|
#{type => union_array(
|
||||||
[ hoconsc:ref(?MODULE, simple_rule)
|
[ hoconsc:ref(?MODULE, simple_rule)
|
||||||
, hoconsc:ref(?MODULE, sql_connector)
|
, hoconsc:ref(?MODULE, mysql)
|
||||||
, hoconsc:ref(?MODULE, redis_connector)
|
, hoconsc:ref(?MODULE, pgsql)
|
||||||
, hoconsc:ref(?MODULE, mongo_connector)
|
, hoconsc:ref(?MODULE, redis)
|
||||||
|
, hoconsc:ref(?MODULE, mongo)
|
||||||
])
|
])
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
@ -115,3 +106,9 @@ query() ->
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
connector_fields(DB) ->
|
||||||
|
Mod = list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
|
||||||
|
[ {principal, principal()}
|
||||||
|
, {type, #{type => DB}}
|
||||||
|
] ++ Mod:fields("").
|
||||||
|
|
|
@ -30,7 +30,7 @@ groups() ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(emqx_resource, check_and_create, fun(_, _, _) -> {ok, meck_data} end ),
|
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
||||||
ok = emqx_ct_helpers:start_apps([emqx_authz], fun set_special_configs/1),
|
ok = emqx_ct_helpers:start_apps([emqx_authz], fun set_special_configs/1),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ groups() ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(emqx_resource, check_and_create, fun(_, _, _) -> {ok, meck_data} end ),
|
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
||||||
ok = emqx_ct_helpers:start_apps([emqx_authz], fun set_special_configs/1),
|
ok = emqx_ct_helpers:start_apps([emqx_authz], fun set_special_configs/1),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ groups() ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(emqx_resource, check_and_create, fun(_, _, _) -> {ok, meck_data} end ),
|
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
||||||
ok = emqx_ct_helpers:start_apps([emqx_authz], fun set_special_configs/1),
|
ok = emqx_ct_helpers:start_apps([emqx_authz], fun set_special_configs/1),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ groups() ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(emqx_resource, check_and_create, fun(_, _, _) -> {ok, meck_data} end ),
|
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
|
||||||
ok = emqx_ct_helpers:start_apps([emqx_authz], fun set_special_configs/1),
|
ok = emqx_ct_helpers:start_apps([emqx_authz], fun set_special_configs/1),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
|
|
@ -88,24 +88,24 @@ on_jsonify(Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
on_start(InstId, #{config := #{server := Server,
|
on_start(InstId, Config = #{server := Server,
|
||||||
mongo_type := single} = Config}) ->
|
mongo_type := single}) ->
|
||||||
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
|
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
|
||||||
Opts = [{type, single},
|
Opts = [{type, single},
|
||||||
{hosts, [Server]}
|
{hosts, [Server]}
|
||||||
],
|
],
|
||||||
do_start(InstId, Opts, Config);
|
do_start(InstId, Opts, Config);
|
||||||
|
|
||||||
on_start(InstId, #{config := #{servers := Servers,
|
on_start(InstId, Config = #{servers := Servers,
|
||||||
mongo_type := rs,
|
mongo_type := rs,
|
||||||
replicaset_name := RsName} = Config}) ->
|
replicaset_name := RsName}) ->
|
||||||
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
|
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
|
||||||
Opts = [{type, {rs, RsName}},
|
Opts = [{type, {rs, RsName}},
|
||||||
{hosts, Servers}],
|
{hosts, Servers}],
|
||||||
do_start(InstId, Opts, Config);
|
do_start(InstId, Opts, Config);
|
||||||
|
|
||||||
on_start(InstId, #{config := #{servers := Servers,
|
on_start(InstId, Config = #{servers := Servers,
|
||||||
mongo_type := sharded} = Config}) ->
|
mongo_type := sharded}) ->
|
||||||
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
|
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
|
||||||
Opts = [{type, sharded},
|
Opts = [{type, sharded},
|
||||||
{hosts, Servers}
|
{hosts, Servers}
|
||||||
|
|
|
@ -37,6 +37,9 @@
|
||||||
structs() -> [""].
|
structs() -> [""].
|
||||||
|
|
||||||
fields("") ->
|
fields("") ->
|
||||||
|
[{config, #{type => hoconsc:ref(?MODULE, config)}}];
|
||||||
|
|
||||||
|
fields(config) ->
|
||||||
emqx_connector_schema_lib:relational_db_fields() ++
|
emqx_connector_schema_lib:relational_db_fields() ++
|
||||||
emqx_connector_schema_lib:ssl_fields().
|
emqx_connector_schema_lib:ssl_fields().
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,9 @@
|
||||||
structs() -> [""].
|
structs() -> [""].
|
||||||
|
|
||||||
fields("") ->
|
fields("") ->
|
||||||
|
[{config, #{type => hoconsc:ref(?MODULE, config)}}];
|
||||||
|
|
||||||
|
fields(config) ->
|
||||||
emqx_connector_schema_lib:relational_db_fields() ++
|
emqx_connector_schema_lib:relational_db_fields() ++
|
||||||
emqx_connector_schema_lib:ssl_fields().
|
emqx_connector_schema_lib:ssl_fields().
|
||||||
|
|
||||||
|
|
|
@ -78,11 +78,11 @@ on_jsonify(Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
on_start(InstId, #{config :=#{redis_type := Type,
|
on_start(InstId, #{redis_type := Type,
|
||||||
database := Database,
|
database := Database,
|
||||||
pool_size := PoolSize,
|
pool_size := PoolSize,
|
||||||
auto_reconnect := AutoReconn,
|
auto_reconnect := AutoReconn,
|
||||||
ssl := SSL } = Config}) ->
|
ssl := SSL } = Config) ->
|
||||||
logger:info("starting redis connector: ~p, config: ~p", [InstId, Config]),
|
logger:info("starting redis connector: ~p, config: ~p", [InstId, Config]),
|
||||||
Servers = case Type of
|
Servers = case Type of
|
||||||
single -> [{servers, [maps:get(server, Config)]}];
|
single -> [{servers, [maps:get(server, Config)]}];
|
||||||
|
|
|
@ -86,10 +86,12 @@ relational_db_fields() ->
|
||||||
].
|
].
|
||||||
|
|
||||||
server(type) -> emqx_schema:ip_port();
|
server(type) -> emqx_schema:ip_port();
|
||||||
|
server(nullable) -> false;
|
||||||
server(validator) -> [?REQUIRED("the field 'server' is required")];
|
server(validator) -> [?REQUIRED("the field 'server' is required")];
|
||||||
server(_) -> undefined.
|
server(_) -> undefined.
|
||||||
|
|
||||||
database(type) -> binary();
|
database(type) -> binary();
|
||||||
|
database(nullable) -> false;
|
||||||
database(validator) -> [?REQUIRED("the field 'database' is required")];
|
database(validator) -> [?REQUIRED("the field 'database' is required")];
|
||||||
database(_) -> undefined.
|
database(_) -> undefined.
|
||||||
|
|
||||||
|
|
|
@ -5,11 +5,6 @@
|
||||||
%%======================================================================================
|
%%======================================================================================
|
||||||
%% Hocon Schema Definitions
|
%% Hocon Schema Definitions
|
||||||
|
|
||||||
-define(BRIDGE_FIELDS(T),
|
|
||||||
[{name, hoconsc:t(typerefl:binary())},
|
|
||||||
{type, hoconsc:t(typerefl:atom(T))},
|
|
||||||
{config, hoconsc:t(hoconsc:ref(list_to_atom("emqx_connector_"++atom_to_list(T)), ""))}]).
|
|
||||||
|
|
||||||
-define(TYPES, [mysql, pgsql, mongo, redis, ldap]).
|
-define(TYPES, [mysql, pgsql, mongo, redis, ldap]).
|
||||||
-define(BRIDGES, [hoconsc:ref(?MODULE, T) || T <- ?TYPES]).
|
-define(BRIDGES, [hoconsc:ref(?MODULE, T) || T <- ?TYPES]).
|
||||||
|
|
||||||
|
@ -19,8 +14,13 @@ fields("emqx_data_bridge") ->
|
||||||
[{bridges, #{type => hoconsc:array(hoconsc:union(?BRIDGES)),
|
[{bridges, #{type => hoconsc:array(hoconsc:union(?BRIDGES)),
|
||||||
default => []}}];
|
default => []}}];
|
||||||
|
|
||||||
fields(mysql) -> ?BRIDGE_FIELDS(mysql);
|
fields(mysql) -> connector_fields(mysql);
|
||||||
fields(pgsql) -> ?BRIDGE_FIELDS(pgsql);
|
fields(pgsql) -> connector_fields(pgsql);
|
||||||
fields(mongo) -> ?BRIDGE_FIELDS(mongo);
|
fields(mongo) -> connector_fields(mongo);
|
||||||
fields(redis) -> ?BRIDGE_FIELDS(redis);
|
fields(redis) -> connector_fields(redis);
|
||||||
fields(ldap) -> ?BRIDGE_FIELDS(ldap).
|
fields(ldap) -> connector_fields(ldap).
|
||||||
|
|
||||||
|
connector_fields(DB) ->
|
||||||
|
Mod = list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
|
||||||
|
[{name, hoconsc:t(typerefl:binary())},
|
||||||
|
{type, #{type => DB}}] ++ Mod:fields("").
|
||||||
|
|
|
@ -39,7 +39,7 @@ enum(Items) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
required(ErrMsg) ->
|
required(ErrMsg) ->
|
||||||
fun(undefined) -> {error, ErrMsg};
|
fun(<<>>) -> {error, ErrMsg};
|
||||||
(_) -> ok
|
(_) -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue