chore(emqx_authz): compression configuration items
Signed-off-by: zhanghongtong <rory-z@outlook.com>
This commit is contained in:
parent
4e8ac36348
commit
0fd18a2795
|
@ -2,66 +2,56 @@ authorization {
|
||||||
sources = [
|
sources = [
|
||||||
# {
|
# {
|
||||||
# type: http
|
# type: http
|
||||||
# config: {
|
# url: "https://emqx.com"
|
||||||
# url: "https://emqx.com"
|
# headers: {
|
||||||
# headers: {
|
# Accept: "application/json"
|
||||||
# Accept: "application/json"
|
# Content-Type: "application/json"
|
||||||
# Content-Type: "application/json"
|
|
||||||
# }
|
|
||||||
# }
|
# }
|
||||||
# },
|
# },
|
||||||
# {
|
# {
|
||||||
# type: mysql
|
# type: mysql
|
||||||
# config: {
|
# server: "127.0.0.1:3306"
|
||||||
# server: "127.0.0.1:3306"
|
# database: mqtt
|
||||||
# database: mqtt
|
# pool_size: 1
|
||||||
# pool_size: 1
|
# username: root
|
||||||
# username: root
|
# password: public
|
||||||
# password: public
|
# auto_reconnect: true
|
||||||
# auto_reconnect: true
|
# ssl: {
|
||||||
# ssl: {
|
# enable: true
|
||||||
# enable: true
|
# cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem"
|
||||||
# cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem"
|
# certfile: "{{ platform_etc_dir }}/certs/client-cert.pem"
|
||||||
# certfile: "{{ platform_etc_dir }}/certs/client-cert.pem"
|
# keyfile: "{{ platform_etc_dir }}/certs/client-key.pem"
|
||||||
# keyfile: "{{ platform_etc_dir }}/certs/client-key.pem"
|
|
||||||
# }
|
|
||||||
# }
|
# }
|
||||||
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or clientid = '%c'"
|
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or clientid = '%c'"
|
||||||
# },
|
# },
|
||||||
# {
|
# {
|
||||||
# type: pgsql
|
# type: pgsql
|
||||||
# config: {
|
# server: "127.0.0.1:5432"
|
||||||
# server: "127.0.0.1:5432"
|
# database: mqtt
|
||||||
# database: mqtt
|
# pool_size: 1
|
||||||
# pool_size: 1
|
# username: root
|
||||||
# username: root
|
# password: public
|
||||||
# password: public
|
# auto_reconnect: true
|
||||||
# auto_reconnect: true
|
# ssl: {enable: false}
|
||||||
# ssl: {enable: false}
|
|
||||||
# }
|
|
||||||
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
|
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
|
||||||
# },
|
# },
|
||||||
# {
|
# {
|
||||||
# type: redis
|
# type: redis
|
||||||
# config: {
|
# server: "127.0.0.1:6379"
|
||||||
# server: "127.0.0.1:6379"
|
# database: 0
|
||||||
# database: 0
|
# pool_size: 1
|
||||||
# pool_size: 1
|
# password: public
|
||||||
# password: public
|
# auto_reconnect: true
|
||||||
# auto_reconnect: true
|
# ssl: {enable: false}
|
||||||
# ssl: {enable: false}
|
|
||||||
# }
|
|
||||||
# cmd: "HGETALL mqtt_authz:%u"
|
# cmd: "HGETALL mqtt_authz:%u"
|
||||||
# },
|
# },
|
||||||
# {
|
# {
|
||||||
# type: mongo
|
# type: mongo
|
||||||
# config: {
|
# mongo_type: single
|
||||||
# mongo_type: single
|
# server: "127.0.0.1:27017"
|
||||||
# server: "127.0.0.1:27017"
|
# pool_size: 1
|
||||||
# pool_size: 1
|
# database: mqtt
|
||||||
# database: mqtt
|
# ssl: {enable: false}
|
||||||
# ssl: {enable: false}
|
|
||||||
# }
|
|
||||||
# collection: mqtt_authz
|
# collection: mqtt_authz
|
||||||
# find: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] }
|
# find: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] }
|
||||||
# },
|
# },
|
||||||
|
|
|
@ -224,10 +224,10 @@ init_source(#{enable := true,
|
||||||
Source#{annotations => #{rules => Rules}};
|
Source#{annotations => #{rules => Rules}};
|
||||||
init_source(#{enable := true,
|
init_source(#{enable := true,
|
||||||
type := http,
|
type := http,
|
||||||
config := #{url := Url} = Config
|
url := Url
|
||||||
} = Source) ->
|
} = Source) ->
|
||||||
NConfig = maps:merge(Config, #{base_url => maps:remove(query, Url)}),
|
NSource= maps:put(base_url, maps:remove(query, Url), Source),
|
||||||
case create_resource(Source#{config := NConfig}) of
|
case create_resource(NSource) of
|
||||||
{error, Reason} -> error({load_config_error, Reason});
|
{error, Reason} -> error({load_config_error, Reason});
|
||||||
Id -> Source#{annotations => #{id => Id}}
|
Id -> Source#{annotations => #{id => Id}}
|
||||||
end;
|
end;
|
||||||
|
@ -325,16 +325,14 @@ gen_id(Type) ->
|
||||||
iolist_to_binary([io_lib:format("~s_~s",[?APP, Type])]).
|
iolist_to_binary([io_lib:format("~s_~s",[?APP, Type])]).
|
||||||
|
|
||||||
create_resource(#{type := DB,
|
create_resource(#{type := DB,
|
||||||
config := Config,
|
annotations := #{id := ResourceID}} = Source) ->
|
||||||
annotations := #{id := ResourceID}}) ->
|
case emqx_resource:update(ResourceID, connector_module(DB), Source, []) of
|
||||||
case emqx_resource:update(ResourceID, connector_module(DB), Config, []) of
|
|
||||||
{ok, _} -> ResourceID;
|
{ok, _} -> ResourceID;
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
end;
|
end;
|
||||||
create_resource(#{type := DB,
|
create_resource(#{type := DB} = Source) ->
|
||||||
config := Config}) ->
|
|
||||||
ResourceID = gen_id(DB),
|
ResourceID = gen_id(DB),
|
||||||
case emqx_resource:create(ResourceID, connector_module(DB), Config) of
|
case emqx_resource:create(ResourceID, connector_module(DB), Source) of
|
||||||
{ok, already_created} -> ResourceID;
|
{ok, already_created} -> ResourceID;
|
||||||
{ok, _} -> ResourceID;
|
{ok, _} -> ResourceID;
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
|
|
|
@ -24,11 +24,10 @@
|
||||||
-define(EXAMPLE_REDIS,
|
-define(EXAMPLE_REDIS,
|
||||||
#{type=> redis,
|
#{type=> redis,
|
||||||
enable => true,
|
enable => true,
|
||||||
config => #{server => <<"127.0.0.1:3306">>,
|
server => <<"127.0.0.1:3306">>,
|
||||||
redis_type => single,
|
redis_type => single,
|
||||||
pool_size => 1,
|
pool_size => 1,
|
||||||
auto_reconnect => true
|
auto_reconnect => true,
|
||||||
},
|
|
||||||
cmd => <<"HGETALL mqtt_authz">>}).
|
cmd => <<"HGETALL mqtt_authz">>}).
|
||||||
-define(EXAMPLE_FILE,
|
-define(EXAMPLE_FILE,
|
||||||
#{type=> file,
|
#{type=> file,
|
||||||
|
@ -308,16 +307,16 @@ sources(get, _) ->
|
||||||
rules => [ io_lib:format("~p", [R])|| R <- Rules],
|
rules => [ io_lib:format("~p", [R])|| R <- Rules],
|
||||||
annotations => #{status => healthy}
|
annotations => #{status => healthy}
|
||||||
}]);
|
}]);
|
||||||
(#{type := _Type, config := Config, annotations := #{id := Id}} = Source, AccIn) ->
|
(#{type := _Type, annotations := #{id := Id}} = Source, AccIn) ->
|
||||||
NSource0 = case maps:get(server, Config, undefined) of
|
NSource0 = case maps:get(server, Source, undefined) of
|
||||||
undefined -> Source;
|
undefined -> Source;
|
||||||
Server ->
|
Server ->
|
||||||
Source#{config => Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}}
|
Source#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}
|
||||||
end,
|
end,
|
||||||
NSource1 = case maps:get(servers, Config, undefined) of
|
NSource1 = case maps:get(servers, Source, undefined) of
|
||||||
undefined -> NSource0;
|
undefined -> NSource0;
|
||||||
Servers ->
|
Servers ->
|
||||||
NSource0#{config => Config#{servers => [emqx_connector_schema_lib:ip_port_to_string(Server) || Server <- Servers]}}
|
NSource0#{servers => [emqx_connector_schema_lib:ip_port_to_string(Server) || Server <- Servers]}
|
||||||
end,
|
end,
|
||||||
NSource2 = case emqx_resource:health_check(Id) of
|
NSource2 = case emqx_resource:health_check(Id) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -377,16 +376,16 @@ source(get, #{bindings := #{type := Type}}) ->
|
||||||
annotations => #{status => healthy}
|
annotations => #{status => healthy}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
#{config := Config, annotations := #{id := Id}} = Source ->
|
#{annotations := #{id := Id}} = Source ->
|
||||||
NSource0 = case maps:get(server, Config, undefined) of
|
NSource0 = case maps:get(server, Source, undefined) of
|
||||||
undefined -> Source;
|
undefined -> Source;
|
||||||
Server ->
|
Server ->
|
||||||
Source#{config => Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}}
|
Source#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}
|
||||||
end,
|
end,
|
||||||
NSource1 = case maps:get(servers, Config, undefined) of
|
NSource1 = case maps:get(servers, Source, undefined) of
|
||||||
undefined -> NSource0;
|
undefined -> NSource0;
|
||||||
Servers ->
|
Servers ->
|
||||||
NSource0#{config => Config#{servers => [emqx_connector_schema_lib:ip_port_to_string(Server) || Server <- Servers]}}
|
NSource0#{servers => [emqx_connector_schema_lib:ip_port_to_string(Server) || Server <- Servers]}
|
||||||
end,
|
end,
|
||||||
NSource2 = case emqx_resource:health_check(Id) of
|
NSource2 = case emqx_resource:health_check(Id) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -434,7 +433,7 @@ move_source(post, #{bindings := #{type := Type}, body := #{<<"position">> := Pos
|
||||||
messgae => atom_to_binary(Reason)}}
|
messgae => atom_to_binary(Reason)}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
read_cert(#{config := #{ssl := #{enable := true} = SSL} = Config} = Source) ->
|
read_cert(#{ssl := #{enable := true} = SSL} = Source) ->
|
||||||
CaCert = case file:read_file(maps:get(cacertfile, SSL, "")) of
|
CaCert = case file:read_file(maps:get(cacertfile, SSL, "")) of
|
||||||
{ok, CaCert0} -> CaCert0;
|
{ok, CaCert0} -> CaCert0;
|
||||||
_ -> ""
|
_ -> ""
|
||||||
|
@ -447,14 +446,14 @@ read_cert(#{config := #{ssl := #{enable := true} = SSL} = Config} = Source) ->
|
||||||
{ok, Key0} -> Key0;
|
{ok, Key0} -> Key0;
|
||||||
_ -> ""
|
_ -> ""
|
||||||
end,
|
end,
|
||||||
Source#{config => Config#{ssl => SSL#{cacertfile => CaCert,
|
Source#{ssl => SSL#{cacertfile => CaCert,
|
||||||
certfile => Cert,
|
certfile => Cert,
|
||||||
keyfile => Key
|
keyfile => Key
|
||||||
}}
|
}
|
||||||
};
|
};
|
||||||
read_cert(Source) -> Source.
|
read_cert(Source) -> Source.
|
||||||
|
|
||||||
write_cert(#{<<"config">> := #{<<"ssl">> := #{<<"enable">> := true} = SSL} = Config} = Source) ->
|
write_cert(#{<<"ssl">> := #{<<"enable">> := true} = SSL} = Source) ->
|
||||||
CertPath = filename:join([emqx:get_config([node, data_dir]), "certs"]),
|
CertPath = filename:join([emqx:get_config([node, data_dir]), "certs"]),
|
||||||
CaCert = case maps:is_key(<<"cacertfile">>, SSL) of
|
CaCert = case maps:is_key(<<"cacertfile">>, SSL) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -477,9 +476,9 @@ write_cert(#{<<"config">> := #{<<"ssl">> := #{<<"enable">> := true} = SSL} = Con
|
||||||
KeyFile;
|
KeyFile;
|
||||||
false -> ""
|
false -> ""
|
||||||
end,
|
end,
|
||||||
Source#{<<"config">> := Config#{<<"ssl">> => SSL#{<<"cacertfile">> => CaCert,
|
Source#{<<"ssl">> => SSL#{<<"cacertfile">> => CaCert,
|
||||||
<<"certfile">> => Cert,
|
<<"certfile">> => Cert,
|
||||||
<<"keyfile">> => Key}
|
<<"keyfile">> => Key
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
write_cert(Source) -> Source.
|
write_cert(Source) -> Source.
|
||||||
|
|
|
@ -35,12 +35,12 @@ description() ->
|
||||||
|
|
||||||
authorize(Client, PubSub, Topic,
|
authorize(Client, PubSub, Topic,
|
||||||
#{type := http,
|
#{type := http,
|
||||||
config := #{url := #{path := Path} = Url,
|
url := #{path := Path} = Url,
|
||||||
headers := Headers,
|
headers := Headers,
|
||||||
method := Method,
|
method := Method,
|
||||||
request_timeout := RequestTimeout} = Config,
|
request_timeout := RequestTimeout,
|
||||||
annotations := #{id := ResourceID}
|
annotations := #{id := ResourceID}
|
||||||
}) ->
|
} = Source) ->
|
||||||
Request = case Method of
|
Request = case Method of
|
||||||
get ->
|
get ->
|
||||||
Query = maps:get(query, Url, ""),
|
Query = maps:get(query, Url, ""),
|
||||||
|
@ -49,7 +49,7 @@ authorize(Client, PubSub, Topic,
|
||||||
_ ->
|
_ ->
|
||||||
Body0 = serialize_body(
|
Body0 = serialize_body(
|
||||||
maps:get('Accept', Headers, <<"application/json">>),
|
maps:get('Accept', Headers, <<"application/json">>),
|
||||||
maps:get(body, Config, #{})
|
maps:get(body, Source, #{})
|
||||||
),
|
),
|
||||||
Body1 = replvar(Body0, PubSub, Topic, Client),
|
Body1 = replvar(Body0, PubSub, Topic, Client),
|
||||||
Path1 = replvar(Path, PubSub, Topic, Client),
|
Path1 = replvar(Path, PubSub, Topic, Client),
|
||||||
|
|
|
@ -20,7 +20,20 @@
|
||||||
roots() -> ["authorization"].
|
roots() -> ["authorization"].
|
||||||
|
|
||||||
fields("authorization") ->
|
fields("authorization") ->
|
||||||
[ {sources, sources()}
|
[ {sources, #{type => union_array(
|
||||||
|
[ hoconsc:ref(?MODULE, file)
|
||||||
|
, hoconsc:ref(?MODULE, http_get)
|
||||||
|
, hoconsc:ref(?MODULE, http_post)
|
||||||
|
, hoconsc:ref(?MODULE, mongo_single)
|
||||||
|
, hoconsc:ref(?MODULE, mongo_rs)
|
||||||
|
, hoconsc:ref(?MODULE, mongo_sharded)
|
||||||
|
, hoconsc:ref(?MODULE, mysql)
|
||||||
|
, hoconsc:ref(?MODULE, pgsql)
|
||||||
|
, hoconsc:ref(?MODULE, redis_single)
|
||||||
|
, hoconsc:ref(?MODULE, redis_sentinel)
|
||||||
|
, hoconsc:ref(?MODULE, redis_cluster)
|
||||||
|
])}
|
||||||
|
}
|
||||||
];
|
];
|
||||||
fields(file) ->
|
fields(file) ->
|
||||||
[ {type, #{type => file}}
|
[ {type, #{type => file}}
|
||||||
|
@ -34,17 +47,11 @@ fields(file) ->
|
||||||
end
|
end
|
||||||
}}
|
}}
|
||||||
];
|
];
|
||||||
fields(http) ->
|
fields(http_get) ->
|
||||||
[ {type, #{type => http}}
|
[ {type, #{type => http}}
|
||||||
, {enable, #{type => boolean(),
|
, {enable, #{type => boolean(),
|
||||||
default => true}}
|
default => true}}
|
||||||
, {config, #{type => hoconsc:union([ hoconsc:ref(?MODULE, http_get)
|
, {url, #{type => url()}}
|
||||||
, hoconsc:ref(?MODULE, http_post)
|
|
||||||
])}
|
|
||||||
}
|
|
||||||
];
|
|
||||||
fields(http_get) ->
|
|
||||||
[ {url, #{type => url()}}
|
|
||||||
, {headers, #{type => map(),
|
, {headers, #{type => map(),
|
||||||
default => #{ <<"accept">> => <<"application/json">>
|
default => #{ <<"accept">> => <<"application/json">>
|
||||||
, <<"cache-control">> => <<"no-cache">>
|
, <<"cache-control">> => <<"no-cache">>
|
||||||
|
@ -68,7 +75,10 @@ fields(http_get) ->
|
||||||
, {request_timeout, #{type => timeout(), default => 30000 }}
|
, {request_timeout, #{type => timeout(), default => 30000 }}
|
||||||
] ++ proplists:delete(base_url, emqx_connector_http:fields(config));
|
] ++ proplists:delete(base_url, emqx_connector_http:fields(config));
|
||||||
fields(http_post) ->
|
fields(http_post) ->
|
||||||
[ {url, #{type => url()}}
|
[ {type, #{type => http}}
|
||||||
|
, {enable, #{type => boolean(),
|
||||||
|
default => true}}
|
||||||
|
, {url, #{type => url()}}
|
||||||
, {headers, #{type => map(),
|
, {headers, #{type => map(),
|
||||||
default => #{ <<"accept">> => <<"application/json">>
|
default => #{ <<"accept">> => <<"application/json">>
|
||||||
, <<"cache-control">> => <<"no-cache">>
|
, <<"cache-control">> => <<"no-cache">>
|
||||||
|
@ -97,47 +107,36 @@ fields(http_post) ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
] ++ proplists:delete(base_url, emqx_connector_http:fields(config));
|
] ++ proplists:delete(base_url, emqx_connector_http:fields(config));
|
||||||
fields(mongo) ->
|
fields(mongo_single) ->
|
||||||
connector_fields(mongo) ++
|
connector_fields(mongo, single) ++
|
||||||
|
[ {collection, #{type => atom()}}
|
||||||
|
, {find, #{type => map()}}
|
||||||
|
];
|
||||||
|
fields(mongo_rs) ->
|
||||||
|
connector_fields(mongo, rs) ++
|
||||||
|
[ {collection, #{type => atom()}}
|
||||||
|
, {find, #{type => map()}}
|
||||||
|
];
|
||||||
|
fields(mongo_sharded) ->
|
||||||
|
connector_fields(mongo, sharded) ++
|
||||||
[ {collection, #{type => atom()}}
|
[ {collection, #{type => atom()}}
|
||||||
, {find, #{type => map()}}
|
, {find, #{type => map()}}
|
||||||
];
|
];
|
||||||
fields(redis) ->
|
|
||||||
connector_fields(redis) ++
|
|
||||||
[ {cmd, query()} ];
|
|
||||||
fields(mysql) ->
|
fields(mysql) ->
|
||||||
connector_fields(mysql) ++
|
connector_fields(mysql) ++
|
||||||
[ {sql, query()} ];
|
[ {sql, query()} ];
|
||||||
fields(pgsql) ->
|
fields(pgsql) ->
|
||||||
connector_fields(pgsql) ++
|
connector_fields(pgsql) ++
|
||||||
[ {sql, query()} ];
|
[ {sql, query()} ];
|
||||||
fields(username) ->
|
fields(redis_single) ->
|
||||||
[{username, #{type => binary()}}];
|
connector_fields(redis, single) ++
|
||||||
fields(clientid) ->
|
[ {cmd, query()} ];
|
||||||
[{clientid, #{type => binary()}}];
|
fields(redis_sentinel) ->
|
||||||
fields(ipaddress) ->
|
connector_fields(redis, sentinel) ++
|
||||||
[{ipaddress, #{type => string()}}];
|
[ {cmd, query()} ];
|
||||||
fields(andlist) ->
|
fields(redis_cluster) ->
|
||||||
[{'and', #{type => union_array(
|
connector_fields(redis, cluster) ++
|
||||||
[ hoconsc:ref(?MODULE, username)
|
[ {cmd, query()} ].
|
||||||
, hoconsc:ref(?MODULE, clientid)
|
|
||||||
, hoconsc:ref(?MODULE, ipaddress)
|
|
||||||
])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
];
|
|
||||||
fields(orlist) ->
|
|
||||||
[{'or', #{type => union_array(
|
|
||||||
[ hoconsc:ref(?MODULE, username)
|
|
||||||
, hoconsc:ref(?MODULE, clientid)
|
|
||||||
, hoconsc:ref(?MODULE, ipaddress)
|
|
||||||
])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
];
|
|
||||||
fields(eq_topic) ->
|
|
||||||
[{eq, #{type => binary()}}].
|
|
||||||
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
|
@ -146,17 +145,6 @@ fields(eq_topic) ->
|
||||||
union_array(Item) when is_list(Item) ->
|
union_array(Item) when is_list(Item) ->
|
||||||
hoconsc:array(hoconsc:union(Item)).
|
hoconsc:array(hoconsc:union(Item)).
|
||||||
|
|
||||||
sources() ->
|
|
||||||
#{type => union_array(
|
|
||||||
[ hoconsc:ref(?MODULE, file)
|
|
||||||
, hoconsc:ref(?MODULE, http)
|
|
||||||
, hoconsc:ref(?MODULE, mysql)
|
|
||||||
, hoconsc:ref(?MODULE, pgsql)
|
|
||||||
, hoconsc:ref(?MODULE, redis)
|
|
||||||
, hoconsc:ref(?MODULE, mongo)
|
|
||||||
])
|
|
||||||
}.
|
|
||||||
|
|
||||||
query() ->
|
query() ->
|
||||||
#{type => binary(),
|
#{type => binary(),
|
||||||
validator => fun(S) ->
|
validator => fun(S) ->
|
||||||
|
@ -168,6 +156,8 @@ query() ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
connector_fields(DB) ->
|
connector_fields(DB) ->
|
||||||
|
connector_fields(DB, config).
|
||||||
|
connector_fields(DB, Fields) ->
|
||||||
Mod0 = io_lib:format("~s_~s",[emqx_connector, DB]),
|
Mod0 = io_lib:format("~s_~s",[emqx_connector, DB]),
|
||||||
Mod = try
|
Mod = try
|
||||||
list_to_existing_atom(Mod0)
|
list_to_existing_atom(Mod0)
|
||||||
|
@ -180,4 +170,4 @@ connector_fields(DB) ->
|
||||||
[ {type, #{type => DB}}
|
[ {type, #{type => DB}}
|
||||||
, {enable, #{type => boolean(),
|
, {enable, #{type => boolean(),
|
||||||
default => true}}
|
default => true}}
|
||||||
] ++ Mod:roots().
|
] ++ Mod:fields(Fields).
|
||||||
|
|
|
@ -62,56 +62,51 @@ init_per_testcase(_, Config) ->
|
||||||
|
|
||||||
-define(SOURCE1, #{<<"type">> => <<"http">>,
|
-define(SOURCE1, #{<<"type">> => <<"http">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"config">> => #{
|
<<"url">> => <<"https://fake.com:443/">>,
|
||||||
<<"url">> => <<"https://fake.com:443/">>,
|
<<"headers">> => #{},
|
||||||
<<"headers">> => #{},
|
<<"method">> => <<"get">>,
|
||||||
<<"method">> => <<"get">>,
|
<<"request_timeout">> => 5000
|
||||||
<<"request_timeout">> => 5000}
|
|
||||||
}).
|
}).
|
||||||
-define(SOURCE2, #{<<"type">> => <<"mongo">>,
|
-define(SOURCE2, #{<<"type">> => <<"mongo">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"config">> => #{
|
<<"mongo_type">> => <<"single">>,
|
||||||
<<"mongo_type">> => <<"single">>,
|
<<"server">> => <<"127.0.0.1:27017">>,
|
||||||
<<"server">> => <<"127.0.0.1:27017">>,
|
<<"pool_size">> => 1,
|
||||||
<<"pool_size">> => 1,
|
<<"database">> => <<"mqtt">>,
|
||||||
<<"database">> => <<"mqtt">>,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"ssl">> => #{<<"enable">> => false}},
|
|
||||||
<<"collection">> => <<"fake">>,
|
<<"collection">> => <<"fake">>,
|
||||||
<<"find">> => #{<<"a">> => <<"b">>}
|
<<"find">> => #{<<"a">> => <<"b">>}
|
||||||
}).
|
}).
|
||||||
-define(SOURCE3, #{<<"type">> => <<"mysql">>,
|
-define(SOURCE3, #{<<"type">> => <<"mysql">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"config">> => #{
|
<<"server">> => <<"127.0.0.1:27017">>,
|
||||||
<<"server">> => <<"127.0.0.1:27017">>,
|
<<"pool_size">> => 1,
|
||||||
<<"pool_size">> => 1,
|
<<"database">> => <<"mqtt">>,
|
||||||
<<"database">> => <<"mqtt">>,
|
<<"username">> => <<"xx">>,
|
||||||
<<"username">> => <<"xx">>,
|
<<"password">> => <<"ee">>,
|
||||||
<<"password">> => <<"ee">>,
|
<<"auto_reconnect">> => true,
|
||||||
<<"auto_reconnect">> => true,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"ssl">> => #{<<"enable">> => false}},
|
|
||||||
<<"sql">> => <<"abcb">>
|
<<"sql">> => <<"abcb">>
|
||||||
}).
|
}).
|
||||||
-define(SOURCE4, #{<<"type">> => <<"pgsql">>,
|
-define(SOURCE4, #{<<"type">> => <<"pgsql">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"config">> => #{
|
<<"server">> => <<"127.0.0.1:27017">>,
|
||||||
<<"server">> => <<"127.0.0.1:27017">>,
|
<<"pool_size">> => 1,
|
||||||
<<"pool_size">> => 1,
|
<<"database">> => <<"mqtt">>,
|
||||||
<<"database">> => <<"mqtt">>,
|
<<"username">> => <<"xx">>,
|
||||||
<<"username">> => <<"xx">>,
|
<<"password">> => <<"ee">>,
|
||||||
<<"password">> => <<"ee">>,
|
<<"auto_reconnect">> => true,
|
||||||
<<"auto_reconnect">> => true,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"ssl">> => #{<<"enable">> => false}},
|
|
||||||
<<"sql">> => <<"abcb">>
|
<<"sql">> => <<"abcb">>
|
||||||
}).
|
}).
|
||||||
-define(SOURCE5, #{<<"type">> => <<"redis">>,
|
-define(SOURCE5, #{<<"type">> => <<"redis">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"config">> => #{
|
<<"server">> => <<"127.0.0.1:27017">>,
|
||||||
<<"server">> => <<"127.0.0.1:27017">>,
|
<<"pool_size">> => 1,
|
||||||
<<"pool_size">> => 1,
|
<<"database">> => 0,
|
||||||
<<"database">> => 0,
|
<<"password">> => <<"ee">>,
|
||||||
<<"password">> => <<"ee">>,
|
<<"auto_reconnect">> => true,
|
||||||
<<"auto_reconnect">> => true,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"ssl">> => #{<<"enable">> => false}},
|
|
||||||
<<"cmd">> => <<"HGETALL mqtt_authz:%u">>
|
<<"cmd">> => <<"HGETALL mqtt_authz:%u">>
|
||||||
}).
|
}).
|
||||||
-define(SOURCE6, #{<<"type">> => <<"file">>,
|
-define(SOURCE6, #{<<"type">> => <<"file">>,
|
||||||
|
|
|
@ -39,61 +39,55 @@
|
||||||
|
|
||||||
-define(SOURCE1, #{<<"type">> => <<"http">>,
|
-define(SOURCE1, #{<<"type">> => <<"http">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"config">> => #{
|
<<"url">> => <<"https://fake.com:443/">>,
|
||||||
<<"url">> => <<"https://fake.com:443/">>,
|
<<"headers">> => #{},
|
||||||
<<"headers">> => #{},
|
<<"method">> => <<"get">>,
|
||||||
<<"method">> => <<"get">>,
|
<<"request_timeout">> => 5000
|
||||||
<<"request_timeout">> => 5000}
|
|
||||||
}).
|
}).
|
||||||
-define(SOURCE2, #{<<"type">> => <<"mongo">>,
|
-define(SOURCE2, #{<<"type">> => <<"mongo">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"config">> => #{
|
<<"mongo_type">> => <<"sharded">>,
|
||||||
<<"mongo_type">> => <<"sharded">>,
|
<<"servers">> => [<<"127.0.0.1:27017">>,
|
||||||
<<"servers">> => [<<"127.0.0.1:27017">>,
|
<<"192.168.0.1:27017">>
|
||||||
<<"192.168.0.1:27017">>
|
],
|
||||||
],
|
<<"pool_size">> => 1,
|
||||||
<<"pool_size">> => 1,
|
<<"database">> => <<"mqtt">>,
|
||||||
<<"database">> => <<"mqtt">>,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"ssl">> => #{<<"enable">> => false}},
|
|
||||||
<<"collection">> => <<"fake">>,
|
<<"collection">> => <<"fake">>,
|
||||||
<<"find">> => #{<<"a">> => <<"b">>}
|
<<"find">> => #{<<"a">> => <<"b">>}
|
||||||
}).
|
}).
|
||||||
-define(SOURCE3, #{<<"type">> => <<"mysql">>,
|
-define(SOURCE3, #{<<"type">> => <<"mysql">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"config">> => #{
|
<<"server">> => <<"127.0.0.1:3306">>,
|
||||||
<<"server">> => <<"127.0.0.1:3306">>,
|
<<"pool_size">> => 1,
|
||||||
<<"pool_size">> => 1,
|
<<"database">> => <<"mqtt">>,
|
||||||
<<"database">> => <<"mqtt">>,
|
<<"username">> => <<"xx">>,
|
||||||
<<"username">> => <<"xx">>,
|
<<"password">> => <<"ee">>,
|
||||||
<<"password">> => <<"ee">>,
|
<<"auto_reconnect">> => true,
|
||||||
<<"auto_reconnect">> => true,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"ssl">> => #{<<"enable">> => false}},
|
|
||||||
<<"sql">> => <<"abcb">>
|
<<"sql">> => <<"abcb">>
|
||||||
}).
|
}).
|
||||||
-define(SOURCE4, #{<<"type">> => <<"pgsql">>,
|
-define(SOURCE4, #{<<"type">> => <<"pgsql">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"config">> => #{
|
<<"server">> => <<"127.0.0.1:5432">>,
|
||||||
<<"server">> => <<"127.0.0.1:5432">>,
|
<<"pool_size">> => 1,
|
||||||
<<"pool_size">> => 1,
|
<<"database">> => <<"mqtt">>,
|
||||||
<<"database">> => <<"mqtt">>,
|
<<"username">> => <<"xx">>,
|
||||||
<<"username">> => <<"xx">>,
|
<<"password">> => <<"ee">>,
|
||||||
<<"password">> => <<"ee">>,
|
<<"auto_reconnect">> => true,
|
||||||
<<"auto_reconnect">> => true,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"ssl">> => #{<<"enable">> => false}},
|
|
||||||
<<"sql">> => <<"abcb">>
|
<<"sql">> => <<"abcb">>
|
||||||
}).
|
}).
|
||||||
-define(SOURCE5, #{<<"type">> => <<"redis">>,
|
-define(SOURCE5, #{<<"type">> => <<"redis">>,
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"config">> => #{
|
<<"servers">> => [<<"127.0.0.1:6379">>,
|
||||||
<<"servers">> => [<<"127.0.0.1:6379">>,
|
<<"127.0.0.1:6380">>
|
||||||
<<"127.0.0.1:6380">>
|
],
|
||||||
],
|
<<"pool_size">> => 1,
|
||||||
<<"pool_size">> => 1,
|
<<"database">> => 0,
|
||||||
<<"database">> => 0,
|
<<"password">> => <<"ee">>,
|
||||||
<<"password">> => <<"ee">>,
|
<<"auto_reconnect">> => true,
|
||||||
<<"auto_reconnect">> => true,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"ssl">> => #{<<"enable">> => false}
|
|
||||||
},
|
|
||||||
<<"cmd">> => <<"HGETALL mqtt_authz:%u">>
|
<<"cmd">> => <<"HGETALL mqtt_authz:%u">>
|
||||||
}).
|
}).
|
||||||
-define(SOURCE6, #{<<"type">> => <<"file">>,
|
-define(SOURCE6, #{<<"type">> => <<"file">>,
|
||||||
|
@ -207,27 +201,22 @@ t_api(_) ->
|
||||||
{ok, 200, Result4} = request(get, uri(["authorization", "sources", "http"]), []),
|
{ok, 200, Result4} = request(get, uri(["authorization", "sources", "http"]), []),
|
||||||
?assertMatch(#{<<"type">> := <<"http">>, <<"enable">> := false}, jsx:decode(Result4)),
|
?assertMatch(#{<<"type">> := <<"http">>, <<"enable">> := false}, jsx:decode(Result4)),
|
||||||
|
|
||||||
#{<<"config">> := Config} = ?SOURCE2,
|
|
||||||
|
|
||||||
dbg:tracer(),dbg:p(all,c),
|
|
||||||
dbg:tpl(emqx_authz_api_sources, read_cert, cx),
|
|
||||||
dbg:tpl(emqx_authz_api_sources, write_cert, cx),
|
|
||||||
{ok, 204, _} = request(put, uri(["authorization", "sources", "mongo"]),
|
{ok, 204, _} = request(put, uri(["authorization", "sources", "mongo"]),
|
||||||
?SOURCE2#{<<"config">> := Config#{<<"ssl">> := #{
|
?SOURCE2#{<<"ssl">> := #{
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"cacertfile">> => <<"fake cacert file">>,
|
<<"cacertfile">> => <<"fake cacert file">>,
|
||||||
<<"certfile">> => <<"fake cert file">>,
|
<<"certfile">> => <<"fake cert file">>,
|
||||||
<<"keyfile">> => <<"fake key file">>,
|
<<"keyfile">> => <<"fake key file">>,
|
||||||
<<"verify">> => false
|
<<"verify">> => false
|
||||||
}}}),
|
}}),
|
||||||
{ok, 200, Result5} = request(get, uri(["authorization", "sources", "mongo"]), []),
|
{ok, 200, Result5} = request(get, uri(["authorization", "sources", "mongo"]), []),
|
||||||
?assertMatch(#{<<"type">> := <<"mongo">>,
|
?assertMatch(#{<<"type">> := <<"mongo">>,
|
||||||
<<"config">> := #{<<"ssl">> := #{<<"enable">> := true,
|
<<"ssl">> := #{<<"enable">> := true,
|
||||||
<<"cacertfile">> := <<"fake cacert file">>,
|
<<"cacertfile">> := <<"fake cacert file">>,
|
||||||
<<"certfile">> := <<"fake cert file">>,
|
<<"certfile">> := <<"fake cert file">>,
|
||||||
<<"keyfile">> := <<"fake key file">>,
|
<<"keyfile">> := <<"fake key file">>,
|
||||||
<<"verify">> := false
|
<<"verify">> := false
|
||||||
}}
|
}
|
||||||
}, jsx:decode(Result5)),
|
}, jsx:decode(Result5)),
|
||||||
?assert(filelib:is_file(filename:join([emqx:get_config([node, data_dir]), "certs", "cacert-fake.pem"]))),
|
?assert(filelib:is_file(filename:join([emqx:get_config([node, data_dir]), "certs", "cacert-fake.pem"]))),
|
||||||
?assert(filelib:is_file(filename:join([emqx:get_config([node, data_dir]), "certs", "cert-fake.pem"]))),
|
?assert(filelib:is_file(filename:join([emqx:get_config([node, data_dir]), "certs", "cert-fake.pem"]))),
|
||||||
|
|
|
@ -47,12 +47,11 @@ init_per_suite(Config) ->
|
||||||
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
||||||
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
||||||
Rules = [#{<<"type">> => <<"http">>,
|
Rules = [#{<<"type">> => <<"http">>,
|
||||||
<<"config">> => #{
|
<<"url">> => <<"https://fake.com:443/">>,
|
||||||
<<"url">> => <<"https://fake.com:443/">>,
|
<<"headers">> => #{},
|
||||||
<<"headers">> => #{},
|
<<"method">> => <<"get">>,
|
||||||
<<"method">> => <<"get">>,
|
<<"request_timeout">> => 5000
|
||||||
<<"request_timeout">> => 5000
|
}
|
||||||
}}
|
|
||||||
],
|
],
|
||||||
{ok, _} = emqx_authz:update(replace, Rules),
|
{ok, _} = emqx_authz:update(replace, Rules),
|
||||||
Config.
|
Config.
|
||||||
|
|
|
@ -47,12 +47,11 @@ init_per_suite(Config) ->
|
||||||
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
||||||
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
||||||
Rules = [#{<<"type">> => <<"mongo">>,
|
Rules = [#{<<"type">> => <<"mongo">>,
|
||||||
<<"config">> => #{
|
<<"mongo_type">> => <<"single">>,
|
||||||
<<"mongo_type">> => <<"single">>,
|
<<"server">> => <<"127.0.0.1:27017">>,
|
||||||
<<"server">> => <<"127.0.0.1:27017">>,
|
<<"pool_size">> => 1,
|
||||||
<<"pool_size">> => 1,
|
<<"database">> => <<"mqtt">>,
|
||||||
<<"database">> => <<"mqtt">>,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"ssl">> => #{<<"enable">> => false}},
|
|
||||||
<<"collection">> => <<"fake">>,
|
<<"collection">> => <<"fake">>,
|
||||||
<<"find">> => #{<<"a">> => <<"b">>}
|
<<"find">> => #{<<"a">> => <<"b">>}
|
||||||
}],
|
}],
|
||||||
|
|
|
@ -48,14 +48,13 @@ init_per_suite(Config) ->
|
||||||
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
||||||
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
||||||
Rules = [#{<<"type">> => <<"mysql">>,
|
Rules = [#{<<"type">> => <<"mysql">>,
|
||||||
<<"config">> => #{
|
<<"server">> => <<"127.0.0.1:27017">>,
|
||||||
<<"server">> => <<"127.0.0.1:27017">>,
|
<<"pool_size">> => 1,
|
||||||
<<"pool_size">> => 1,
|
<<"database">> => <<"mqtt">>,
|
||||||
<<"database">> => <<"mqtt">>,
|
<<"username">> => <<"xx">>,
|
||||||
<<"username">> => <<"xx">>,
|
<<"password">> => <<"ee">>,
|
||||||
<<"password">> => <<"ee">>,
|
<<"auto_reconnect">> => true,
|
||||||
<<"auto_reconnect">> => true,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"ssl">> => #{<<"enable">> => false}},
|
|
||||||
<<"sql">> => <<"abcb">>
|
<<"sql">> => <<"abcb">>
|
||||||
}],
|
}],
|
||||||
{ok, _} = emqx_authz:update(replace, Rules),
|
{ok, _} = emqx_authz:update(replace, Rules),
|
||||||
|
|
|
@ -48,14 +48,13 @@ init_per_suite(Config) ->
|
||||||
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
||||||
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
||||||
Rules = [#{<<"type">> => <<"pgsql">>,
|
Rules = [#{<<"type">> => <<"pgsql">>,
|
||||||
<<"config">> => #{
|
<<"server">> => <<"127.0.0.1:27017">>,
|
||||||
<<"server">> => <<"127.0.0.1:27017">>,
|
<<"pool_size">> => 1,
|
||||||
<<"pool_size">> => 1,
|
<<"database">> => <<"mqtt">>,
|
||||||
<<"database">> => <<"mqtt">>,
|
<<"username">> => <<"xx">>,
|
||||||
<<"username">> => <<"xx">>,
|
<<"password">> => <<"ee">>,
|
||||||
<<"password">> => <<"ee">>,
|
<<"auto_reconnect">> => true,
|
||||||
<<"auto_reconnect">> => true,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"ssl">> => #{<<"enable">> => false}},
|
|
||||||
<<"sql">> => <<"abcb">>
|
<<"sql">> => <<"abcb">>
|
||||||
}],
|
}],
|
||||||
{ok, _} = emqx_authz:update(replace, Rules),
|
{ok, _} = emqx_authz:update(replace, Rules),
|
||||||
|
|
|
@ -47,13 +47,12 @@ init_per_suite(Config) ->
|
||||||
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
||||||
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
||||||
Rules = [#{<<"type">> => <<"redis">>,
|
Rules = [#{<<"type">> => <<"redis">>,
|
||||||
<<"config">> => #{
|
<<"server">> => <<"127.0.0.1:27017">>,
|
||||||
<<"server">> => <<"127.0.0.1:27017">>,
|
<<"pool_size">> => 1,
|
||||||
<<"pool_size">> => 1,
|
<<"database">> => 0,
|
||||||
<<"database">> => 0,
|
<<"password">> => <<"ee">>,
|
||||||
<<"password">> => <<"ee">>,
|
<<"auto_reconnect">> => true,
|
||||||
<<"auto_reconnect">> => true,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"ssl">> => #{<<"enable">> => false}},
|
|
||||||
<<"cmd">> => <<"HGETALL mqtt_authz:%u">>
|
<<"cmd">> => <<"HGETALL mqtt_authz:%u">>
|
||||||
}],
|
}],
|
||||||
{ok, _} = emqx_authz:update(replace, Rules),
|
{ok, _} = emqx_authz:update(replace, Rules),
|
||||||
|
|
Loading…
Reference in New Issue