chore: sync code from enterprise

This commit is contained in:
Shawn 2023-05-06 16:08:31 +08:00
parent d9f8c096ed
commit ee50359357
8 changed files with 252 additions and 39 deletions

View File

@ -32,6 +32,8 @@ services:
image: $TARGET:$EMQX_TAG image: $TARGET:$EMQX_TAG
env_file: env_file:
- conf.cluster.env - conf.cluster.env
volumes:
- ../../:/src/
environment: environment:
- "EMQX_HOST=node1.emqx.io" - "EMQX_HOST=node1.emqx.io"
command: command:
@ -57,6 +59,8 @@ services:
image: $TARGET:$EMQX_TAG image: $TARGET:$EMQX_TAG
env_file: env_file:
- conf.cluster.env - conf.cluster.env
volumes:
- ../../:/src/
environment: environment:
- "EMQX_HOST=node2.emqx.io" - "EMQX_HOST=node2.emqx.io"
command: command:

View File

@ -7,6 +7,8 @@ services:
restart: always restart: always
environment: environment:
MONGO_INITDB_DATABASE: mqtt MONGO_INITDB_DATABASE: mqtt
expose:
- 27017
networks: networks:
- emqx_bridge - emqx_bridge
command: command:

View File

@ -11,6 +11,9 @@ services:
- "./toxiproxy.json:/config/toxiproxy.json" - "./toxiproxy.json:/config/toxiproxy.json"
ports: ports:
- 8474:8474 - 8474:8474
- 27011:27011
- 27016:27016
- 27017:27017
command: command:
- "-host=0.0.0.0" - "-host=0.0.0.0"
- "-config=/config/toxiproxy.json" - "-config=/config/toxiproxy.json"

View File

@ -4,5 +4,41 @@
"listen": "0.0.0.0:27017", "listen": "0.0.0.0:27017",
"upstream": "mongo:27017", "upstream": "mongo:27017",
"enabled": true "enabled": true
},
{
"name": "mongo_rs1",
"listen": "0.0.0.0:27011",
"upstream": "mongo_rs1:27017",
"enabled": true
},
{
"name": "mongo_rs2",
"listen": "0.0.0.0:27012",
"upstream": "mongo_rs2:27017",
"enabled": true
},
{
"name": "mongo_rs3",
"listen": "0.0.0.0:27013",
"upstream": "mongo_rs3:27017",
"enabled": true
},
{
"name": "mongo_sharded1",
"listen": "0.0.0.0:27014",
"upstream": "mongosharded1:27017",
"enabled": true
},
{
"name": "mongo_sharded2",
"listen": "0.0.0.0:27015",
"upstream": "mongosharded2:27017",
"enabled": true
},
{
"name": "mongo_sharded3",
"listen": "0.0.0.0:27016",
"upstream": "mongosharded3:27017",
"enabled": true
} }
] ]

View File

@ -1,6 +1,6 @@
{application, emqx_auth_pgsql, {application, emqx_auth_pgsql,
[{description, "EMQX Authentication/ACL with PostgreSQL"}, [{description, "EMQX Authentication/ACL with PostgreSQL"},
{vsn, "4.4.6"}, % strict semver, bump manually! {vsn, "4.4.7"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_pgsql_sup]}, {registered, [emqx_auth_pgsql_sup]},
{applications, [kernel,stdlib,epgsql,ecpool]}, {applications, [kernel,stdlib,epgsql,ecpool]},

View File

@ -1,11 +1,14 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.4.5", [{"4.4.6",[{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]}]},
[{load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}, {"4.4.5",
[{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}]},
{"4.4.4", {"4.4.4",
[{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}]},
{"4.4.3", {"4.4.3",
[{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}, [{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]},
@ -17,11 +20,14 @@
{load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_pgsql_app,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_pgsql_app,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.4.5", [{"4.4.6",[{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]}]},
[{load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}, {"4.4.5",
[{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}]},
{"4.4.4", {"4.4.4",
[{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}]},
{"4.4.3", {"4.4.3",
[{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}, [{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]},

View File

@ -22,13 +22,22 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("epgsql/include/epgsql.hrl").
-export([connect/1]). -export([connect/1]).
-export([parse_query/2]). -export([parse_query/2, pgvar/2]).
-export([ equery/4 -export([ equery/4
, equery/3 , equery/3
]). ]).
-export([ get_sql_conf/1
, put_sql_conf/2
, clear_sql_conf/1
, get_cached_statement/1
, put_cached_statement/2
, clear_cached_statement/1
]).
-type client_info() :: #{username := _, -type client_info() :: #{username := _,
clientid := _, clientid := _,
peerhost := _, peerhost := _,
@ -44,9 +53,9 @@ parse_query(Par, Sql) ->
case re:run(Sql, "'%[ucCad]'", [global, {capture, all, list}]) of case re:run(Sql, "'%[ucCad]'", [global, {capture, all, list}]) of
{match, Variables} -> {match, Variables} ->
Params = [Var || [Var] <- Variables], Params = [Var || [Var] <- Variables],
{atom_to_list(Par), Params}; {str(Par), Params};
nomatch -> nomatch ->
{atom_to_list(Par), []} {str(Par), []}
end. end.
pgvar(Sql, Params) -> pgvar(Sql, Params) ->
@ -86,16 +95,24 @@ connect(Opts) ->
{error, Reason} {error, Reason}
end. end.
conn_post(Connection) -> conn_post(Conn) ->
lists:foreach(fun(Par) -> lists:foreach(fun(PreparedStKey) ->
Sql0 = application:get_env(?APP, Par, undefined), case prepare_statement(Conn, PreparedStKey) of
case parse_query(Par, Sql0) of {ok, St} -> put_cached_statement(PreparedStKey, St);
Error -> Error
end
end, [auth_query, acl_query, super_query]).
prepare_statement(Conn, PreparedStKey) ->
%% for emqx_auth_pgsql plugin, the PreparedStKey is an atom(),
%% but for emqx_module_auth_pgsql, it is a list().
Sql0 = get_sql_conf(PreparedStKey),
case parse_query(PreparedStKey, Sql0) of
undefined -> ok; undefined -> ok;
{_, Params} -> {_, Params} ->
Sql = pgvar(Sql0, Params), Sql = pgvar(Sql0, Params),
epgsql:parse(Connection, atom_to_list(Par), Sql, []) epgsql:parse(Conn, str(PreparedStKey), Sql, [])
end end.
end, [auth_query, acl_query, super_query]).
conn_opts(Opts) -> conn_opts(Opts) ->
conn_opts(Opts, []). conn_opts(Opts, []).
@ -115,12 +132,50 @@ conn_opts([_Opt|Opts], Acc) ->
conn_opts(Opts, Acc). conn_opts(Opts, Acc).
-spec(equery(atom(), string() | epgsql:statement(), Parameters::[any()]) -> {ok, ColumnsDescription :: [any()], RowsValues :: [any()]} | {error, any()} ). -spec(equery(atom(), string() | epgsql:statement(), Parameters::[any()]) -> {ok, ColumnsDescription :: [any()], RowsValues :: [any()]} | {error, any()} ).
equery(Pool, Sql, Params) -> equery(Pool, PreparedStKey, Params) ->
ecpool:with_client(Pool, fun(C) -> epgsql:prepared_query(C, Sql, Params) end). do_equery(Pool, PreparedStKey, Params).
-spec(equery(atom(), string() | epgsql:statement(), Parameters::[any()], client_info()) -> {ok, ColumnsDescription :: [any()], RowsValues :: [any()]} | {error, any()} ). -spec(equery(atom(), string() | epgsql:statement(), Parameters::[any()], client_info()) -> {ok, ColumnsDescription :: [any()], RowsValues :: [any()]} | {error, any()} ).
equery(Pool, Sql, Params, ClientInfo) -> equery(Pool, PreparedStKey, Params, ClientInfo) ->
ecpool:with_client(Pool, fun(C) -> epgsql:prepared_query(C, Sql, replvar(Params, ClientInfo)) end). Params1 = replvar(Params, ClientInfo),
equery(Pool, PreparedStKey, Params1).
get_sql_conf(PreparedStKey) ->
application:get_env(?APP, prpst_key_to_atom(PreparedStKey), undefined).
put_sql_conf(PreparedStKey, Sql) ->
application:set_env(?APP, prpst_key_to_atom(PreparedStKey), Sql).
clear_sql_conf(PreparedStKey) ->
application:unset_env(?APP, prpst_key_to_atom(PreparedStKey)).
get_cached_statement(PreparedStKey) ->
application:get_env(?APP, statement_key(PreparedStKey), PreparedStKey).
put_cached_statement(PreparedStKey, St) ->
application:set_env(?APP, statement_key(PreparedStKey), St).
clear_cached_statement(PreparedStKey) ->
application:unset_env(?APP, statement_key(PreparedStKey)).
do_equery(Pool, PreparedStKey, Params) ->
ecpool:with_client(Pool, fun(Conn) ->
StOrKey = get_cached_statement(PreparedStKey),
case epgsql:prepared_query(Conn, StOrKey, Params) of
{error, #error{severity = error, code = <<"26000">>}} ->
%% invalid_sql_statement_name, we need to prepare the statement and try again
case prepare_statement(Conn, PreparedStKey) of
{ok, St} ->
ok = put_cached_statement(PreparedStKey, St),
epgsql:prepared_query(Conn, St, Params);
{error, _} = Error ->
Error
end;
Return ->
Return
end
end).
replvar(Params, ClientInfo) -> replvar(Params, ClientInfo) ->
replvar(Params, ClientInfo, []). replvar(Params, ClientInfo, []).
@ -147,3 +202,26 @@ safe_get(K, ClientInfo) ->
bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(B) when is_binary(B) -> B; bin(B) when is_binary(B) -> B;
bin(X) -> X. bin(X) -> X.
str(B) when is_binary(B) -> binary_to_list(B);
str(A) when is_atom(A) -> atom_to_list(A);
str(S) when is_list(S) -> S.
prpst_key_to_atom(super_query) -> super_query;
prpst_key_to_atom("super_query") -> super_query;
prpst_key_to_atom(auth_query) -> auth_query;
prpst_key_to_atom("auth_query") -> auth_query;
prpst_key_to_atom(acl_query) -> acl_query;
prpst_key_to_atom("acl_query") -> acl_query;
prpst_key_to_atom(PreparedStKey) ->
throw({unsupported_prepared_statement_key, PreparedStKey}).
%% the spec of application:get_env(App, Key, Def) requires an atom() Key
statement_key(super_query) -> statement_super_query;
statement_key("super_query") -> statement_super_query;
statement_key(auth_query) -> statement_auth_query;
statement_key("auth_query") -> statement_auth_query;
statement_key(acl_query) -> statement_acl_query;
statement_key("acl_query") -> statement_acl_query;
statement_key(PreparedStKey) ->
throw({unsupported_prepared_statement_key, PreparedStKey}).

View File

@ -19,7 +19,7 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-define(POOL, emqx_auth_pgsql). -define(POOL, emqx_auth_pgsql_test_suite).
-define(APP, emqx_auth_pgsql). -define(APP, emqx_auth_pgsql).
@ -67,6 +67,12 @@
(6, false, 'bcrypt_foo', '$2a$12$sSS8Eg.ovVzaHzi1nUHYK.HbUIOdlQI0iS22Q5rd5z.JVVYH6sfm6', '$2a$12$sSS8Eg.ovVzaHzi1nUHYK.'), (6, false, 'bcrypt_foo', '$2a$12$sSS8Eg.ovVzaHzi1nUHYK.HbUIOdlQI0iS22Q5rd5z.JVVYH6sfm6', '$2a$12$sSS8Eg.ovVzaHzi1nUHYK.'),
(7, false, 'bcrypt', '$2y$16$rEVsDarhgHYB0TGnDFJzyu5f.T.Ha9iXMTk9J36NCMWWM7O16qyaK', 'salt')"). (7, false, 'bcrypt', '$2y$16$rEVsDarhgHYB0TGnDFJzyu5f.T.Ha9iXMTk9J36NCMWWM7O16qyaK', 'salt')").
-define(OS_ENVS(SERVER, USER, PASSWD, DB), [SERVER, USER, PASSWD, DB]).
-define(CMD,"EMQX_AUTH__PGSQL__SERVER=127.0.0.1:5432 "
"EMQX_AUTH__PGSQL__USERNAME=root "
"EMQX_AUTH__PGSQL__PASSWORD=public "
"EMQX_AUTH__PGSQL__DATABASE=mqtt "
"./rebar3 ct --name 'test@127.0.0.1' -v --suite apps/emqx_auth_pgsql/test/emqx_auth_pgsql_SUITE").
all() -> all() ->
emqx_ct:all(?MODULE). emqx_ct:all(?MODULE).
@ -74,14 +80,71 @@ suite() ->
[{timetrap, {seconds, 120}}]. [{timetrap, {seconds, 120}}].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_auth_pgsql]), {ok, _} = application:ensure_all_started(ecpool),
case get_params_from_os_env(?OS_ENVS(
"EMQX_AUTH__PGSQL__SERVER",
"EMQX_AUTH__PGSQL__USERNAME",
"EMQX_AUTH__PGSQL__PASSWORD",
"EMQX_AUTH__PGSQL__DATABASE")) of
{error, Reason} ->
ct:print("You could start your test as: "?CMD),
{skip, Reason};
{ok, ?OS_ENVS(Server, Username, Password, Database)} ->
do_init_per_suite(Server, Username, Password, Database, Config)
end.
get_params_from_os_env(Keys) ->
get_params_from_os_env(Keys, []).
get_params_from_os_env([], Acc) ->
{ok, lists:reverse(Acc)};
get_params_from_os_env([Key | Keys], Acc) ->
case os:getenv(Key) of
false -> {error, iolist_to_binary(io_lib:format("OS Env ~s is undefined", [Key]))};
Val -> get_params_from_os_env(Keys, [Val | Acc])
end.
do_init_per_suite(Server, Username, Password, Database, Config) ->
{HostS, Port} = case string:split(Server, ":", trailing) of
[Domain] -> {string:trim(Domain), 5432};
[Domain, PortS] -> {string:trim(Domain), list_to_integer(string:trim(PortS))}
end,
Host = case inet:parse_address(HostS) of
{ok, IpAddr} -> IpAddr;
_ -> HostS
end,
Opts = [{host, Host},
{port, Port},
{username, Username},
{password, Password},
{database, Database},
{timeout, 5000},
{auto_reconnect, 15},
{encoding, utf8},
{pool_type, random},
{pool_size, 8}],
case ecpool:start_sup_pool(?POOL, ?MODULE, Opts) of
{ok, _} -> ok;
{error, Reason} ->
{error, Reason}
end,
drop_acl(), drop_acl(),
drop_auth(), drop_auth(),
init_auth(), init_auth(),
init_acl(), init_acl(),
emqx_ct_helpers:start_apps([emqx_auth_pgsql]),
set_special_configs(), set_special_configs(),
Config. Config.
connect(Opts) ->
Host = proplists:get_value(host, Opts),
Username = proplists:get_value(username, Opts),
Password = proplists:get_value(password, Opts),
case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
{ok, C} -> {ok, C};
{error, Reason} -> {error, Reason}
end.
end_per_suite(Config) -> end_per_suite(Config) ->
emqx_ct_helpers:stop_apps([emqx_auth_pgsql]), emqx_ct_helpers:stop_apps([emqx_auth_pgsql]),
Config. Config.
@ -204,21 +267,42 @@ reload(Config) when is_list(Config) ->
application:start(?APP). application:start(?APP).
init_acl() -> init_acl() ->
{ok, Pid} = ecpool_worker:client(gproc_pool:pick_worker({ecpool, ?POOL})), ecpool:with_client(?POOL, fun(Pid) ->
{ok, [], []} = epgsql:squery(Pid, ?DROP_ACL_TABLE), {ok, [], []} = epgsql:squery(Pid, ?DROP_ACL_TABLE),
{ok, [], []} = epgsql:squery(Pid, ?CREATE_ACL_TABLE), {ok, [], []} = epgsql:squery(Pid, ?CREATE_ACL_TABLE),
{ok, _} = epgsql:equery(Pid, ?INIT_ACL). {ok, _} = epgsql:equery(Pid, ?INIT_ACL)
end).
drop_acl() -> drop_acl() ->
{ok, Pid} = ecpool_worker:client(gproc_pool:pick_worker({ecpool, ?POOL})), ecpool:with_client(?POOL, fun(Pid) ->
{ok, [], []}= epgsql:squery(Pid, ?DROP_ACL_TABLE). {ok, [], []}= epgsql:squery(Pid, ?DROP_ACL_TABLE)
end).
init_auth() -> init_auth() ->
{ok, Pid} = ecpool_worker:client(gproc_pool:pick_worker({ecpool, ?POOL})), ecpool:with_client(?POOL, fun(Pid) ->
{ok, [], []} = epgsql:squery(Pid, ?DROP_AUTH_TABLE), {ok, [], []} = epgsql:squery(Pid, ?DROP_AUTH_TABLE),
{ok, [], []} = epgsql:squery(Pid, ?CREATE_AUTH_TABLE), {ok, [], []} = epgsql:squery(Pid, ?CREATE_AUTH_TABLE),
{ok, _} = epgsql:equery(Pid, ?INIT_AUTH). {ok, _} = epgsql:equery(Pid, ?INIT_AUTH)
end).
drop_auth() -> drop_auth() ->
{ok, Pid} = ecpool_worker:client(gproc_pool:pick_worker({ecpool, ?POOL})), ecpool:with_client(?POOL, fun(Pid) ->
{ok, [], []} = epgsql:squery(Pid, ?DROP_AUTH_TABLE). {ok, [], []} = epgsql:squery(Pid, ?DROP_AUTH_TABLE)
end).
conn_opts(Opts) ->
conn_opts(Opts, []).
conn_opts([], Acc) ->
Acc;
conn_opts([Opt = {database, _}|Opts], Acc) ->
conn_opts(Opts, [Opt|Acc]);
conn_opts([Opt = {ssl, _}|Opts], Acc) ->
conn_opts(Opts, [Opt|Acc]);
conn_opts([Opt = {port, _}|Opts], Acc) ->
conn_opts(Opts, [Opt|Acc]);
conn_opts([Opt = {timeout, _}|Opts], Acc) ->
conn_opts(Opts, [Opt|Acc]);
conn_opts([Opt = {ssl_opts, _}|Opts], Acc) ->
conn_opts(Opts, [Opt|Acc]);
conn_opts([_Opt|Opts], Acc) ->
conn_opts(Opts, Acc).