diff --git a/.ci/docker-compose-file/docker-compose-emqx-cluster.yaml b/.ci/docker-compose-file/docker-compose-emqx-cluster.yaml index 3655928e7..6acfe36a2 100644 --- a/.ci/docker-compose-file/docker-compose-emqx-cluster.yaml +++ b/.ci/docker-compose-file/docker-compose-emqx-cluster.yaml @@ -32,6 +32,8 @@ services: image: $TARGET:$EMQX_TAG env_file: - conf.cluster.env + volumes: + - ../../:/src/ environment: - "EMQX_HOST=node1.emqx.io" command: @@ -57,6 +59,8 @@ services: image: $TARGET:$EMQX_TAG env_file: - conf.cluster.env + volumes: + - ../../:/src/ environment: - "EMQX_HOST=node2.emqx.io" command: diff --git a/.ci/docker-compose-file/docker-compose-mongo-tcp.yaml b/.ci/docker-compose-file/docker-compose-mongo-tcp.yaml index dee2daff6..f3874fe3b 100644 --- a/.ci/docker-compose-file/docker-compose-mongo-tcp.yaml +++ b/.ci/docker-compose-file/docker-compose-mongo-tcp.yaml @@ -2,11 +2,13 @@ version: '3.9' services: mongo_server: - container_name: mongo + container_name: mongo image: mongo:${MONGO_TAG} restart: always environment: MONGO_INITDB_DATABASE: mqtt + expose: + - 27017 networks: - emqx_bridge command: diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index 005ac40d0..1b50cdd0d 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -11,6 +11,9 @@ services: - "./toxiproxy.json:/config/toxiproxy.json" ports: - 8474:8474 + - 27011:27011 + - 27016:27016 + - 27017:27017 command: - "-host=0.0.0.0" - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index 8bc77bdbf..5491ebd89 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -4,5 +4,41 @@ "listen": "0.0.0.0:27017", "upstream": "mongo:27017", "enabled": true + }, + { + "name": "mongo_rs1", + "listen": "0.0.0.0:27011", + "upstream": "mongo_rs1:27017", + "enabled": true + }, + { + "name": "mongo_rs2", + "listen": "0.0.0.0:27012", + "upstream": "mongo_rs2:27017", + "enabled": true + }, + { + "name": "mongo_rs3", + "listen": "0.0.0.0:27013", + "upstream": "mongo_rs3:27017", + "enabled": true + }, + { + "name": "mongo_sharded1", + "listen": "0.0.0.0:27014", + "upstream": "mongosharded1:27017", + "enabled": true + }, + { + "name": "mongo_sharded2", + "listen": "0.0.0.0:27015", + "upstream": "mongosharded2:27017", + "enabled": true + }, + { + "name": "mongo_sharded3", + "listen": "0.0.0.0:27016", + "upstream": "mongosharded3:27017", + "enabled": true } ] diff --git a/apps/emqx_auth_pgsql/src/emqx_auth_pgsql.app.src b/apps/emqx_auth_pgsql/src/emqx_auth_pgsql.app.src index edbbcba9f..588cba808 100644 --- a/apps/emqx_auth_pgsql/src/emqx_auth_pgsql.app.src +++ b/apps/emqx_auth_pgsql/src/emqx_auth_pgsql.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_pgsql, [{description, "EMQX Authentication/ACL with PostgreSQL"}, - {vsn, "4.4.6"}, % strict semver, bump manually! + {vsn, "4.4.7"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_pgsql_sup]}, {applications, [kernel,stdlib,epgsql,ecpool]}, diff --git a/apps/emqx_auth_pgsql/src/emqx_auth_pgsql.appup.src b/apps/emqx_auth_pgsql/src/emqx_auth_pgsql.appup.src index 7a661c5c7..1e99b80de 100644 --- a/apps/emqx_auth_pgsql/src/emqx_auth_pgsql.appup.src +++ b/apps/emqx_auth_pgsql/src/emqx_auth_pgsql.appup.src @@ -1,11 +1,14 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.5", - [{load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}, + [{"4.4.6",[{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]}]}, + {"4.4.5", + [{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}]}, {"4.4.4", - [{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}]}, {"4.4.3", [{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}, @@ -17,11 +20,14 @@ {load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_pgsql_app,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.4.5", - [{load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}, + [{"4.4.6",[{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]}]}, + {"4.4.5", + [{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}]}, {"4.4.4", - [{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]}]}, {"4.4.3", [{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_auth_pgsql/src/emqx_auth_pgsql_cli.erl b/apps/emqx_auth_pgsql/src/emqx_auth_pgsql_cli.erl index 3371d21db..5fabfead7 100644 --- a/apps/emqx_auth_pgsql/src/emqx_auth_pgsql_cli.erl +++ b/apps/emqx_auth_pgsql/src/emqx_auth_pgsql_cli.erl @@ -22,13 +22,22 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("epgsql/include/epgsql.hrl"). -export([connect/1]). --export([parse_query/2]). +-export([parse_query/2, pgvar/2]). -export([ equery/4 , equery/3 ]). +-export([ get_sql_conf/1 + , put_sql_conf/2 + , clear_sql_conf/1 + , get_cached_statement/1 + , put_cached_statement/2 + , clear_cached_statement/1 + ]). + -type client_info() :: #{username := _, clientid := _, peerhost := _, @@ -44,9 +53,9 @@ parse_query(Par, Sql) -> case re:run(Sql, "'%[ucCad]'", [global, {capture, all, list}]) of {match, Variables} -> Params = [Var || [Var] <- Variables], - {atom_to_list(Par), Params}; + {str(Par), Params}; nomatch -> - {atom_to_list(Par), []} + {str(Par), []} end. pgvar(Sql, Params) -> @@ -86,16 +95,24 @@ connect(Opts) -> {error, Reason} end. -conn_post(Connection) -> - lists:foreach(fun(Par) -> - Sql0 = application:get_env(?APP, Par, undefined), - case parse_query(Par, Sql0) of - undefined -> ok; - {_, Params} -> - Sql = pgvar(Sql0, Params), - epgsql:parse(Connection, atom_to_list(Par), Sql, []) - end - end, [auth_query, acl_query, super_query]). +conn_post(Conn) -> + lists:foreach(fun(PreparedStKey) -> + case prepare_statement(Conn, PreparedStKey) of + {ok, St} -> put_cached_statement(PreparedStKey, St); + Error -> Error + end + end, [auth_query, acl_query, super_query]). + +prepare_statement(Conn, PreparedStKey) -> + %% for emqx_auth_pgsql plugin, the PreparedStKey is an atom(), + %% but for emqx_module_auth_pgsql, it is a list(). + Sql0 = get_sql_conf(PreparedStKey), + case parse_query(PreparedStKey, Sql0) of + undefined -> ok; + {_, Params} -> + Sql = pgvar(Sql0, Params), + epgsql:parse(Conn, str(PreparedStKey), Sql, []) + end. conn_opts(Opts) -> conn_opts(Opts, []). @@ -115,12 +132,50 @@ conn_opts([_Opt|Opts], Acc) -> conn_opts(Opts, Acc). -spec(equery(atom(), string() | epgsql:statement(), Parameters::[any()]) -> {ok, ColumnsDescription :: [any()], RowsValues :: [any()]} | {error, any()} ). -equery(Pool, Sql, Params) -> - ecpool:with_client(Pool, fun(C) -> epgsql:prepared_query(C, Sql, Params) end). +equery(Pool, PreparedStKey, Params) -> + do_equery(Pool, PreparedStKey, Params). -spec(equery(atom(), string() | epgsql:statement(), Parameters::[any()], client_info()) -> {ok, ColumnsDescription :: [any()], RowsValues :: [any()]} | {error, any()} ). -equery(Pool, Sql, Params, ClientInfo) -> - ecpool:with_client(Pool, fun(C) -> epgsql:prepared_query(C, Sql, replvar(Params, ClientInfo)) end). +equery(Pool, PreparedStKey, Params, ClientInfo) -> + Params1 = replvar(Params, ClientInfo), + equery(Pool, PreparedStKey, Params1). + +get_sql_conf(PreparedStKey) -> + application:get_env(?APP, prpst_key_to_atom(PreparedStKey), undefined). + +put_sql_conf(PreparedStKey, Sql) -> + application:set_env(?APP, prpst_key_to_atom(PreparedStKey), Sql). + +clear_sql_conf(PreparedStKey) -> + application:unset_env(?APP, prpst_key_to_atom(PreparedStKey)). + +get_cached_statement(PreparedStKey) -> + application:get_env(?APP, statement_key(PreparedStKey), PreparedStKey). + +put_cached_statement(PreparedStKey, St) -> + application:set_env(?APP, statement_key(PreparedStKey), St). + +clear_cached_statement(PreparedStKey) -> + application:unset_env(?APP, statement_key(PreparedStKey)). + +do_equery(Pool, PreparedStKey, Params) -> + ecpool:with_client(Pool, fun(Conn) -> + StOrKey = get_cached_statement(PreparedStKey), + case epgsql:prepared_query(Conn, StOrKey, Params) of + {error, #error{severity = error, code = <<"26000">>}} -> + %% invalid_sql_statement_name, we need to prepare the statement and try again + case prepare_statement(Conn, PreparedStKey) of + {ok, St} -> + ok = put_cached_statement(PreparedStKey, St), + epgsql:prepared_query(Conn, St, Params); + {error, _} = Error -> + Error + end; + Return -> + Return + end + + end). replvar(Params, ClientInfo) -> replvar(Params, ClientInfo, []). @@ -147,3 +202,26 @@ safe_get(K, ClientInfo) -> bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(B) when is_binary(B) -> B; bin(X) -> X. + +str(B) when is_binary(B) -> binary_to_list(B); +str(A) when is_atom(A) -> atom_to_list(A); +str(S) when is_list(S) -> S. + +prpst_key_to_atom(super_query) -> super_query; +prpst_key_to_atom("super_query") -> super_query; +prpst_key_to_atom(auth_query) -> auth_query; +prpst_key_to_atom("auth_query") -> auth_query; +prpst_key_to_atom(acl_query) -> acl_query; +prpst_key_to_atom("acl_query") -> acl_query; +prpst_key_to_atom(PreparedStKey) -> + throw({unsupported_prepared_statement_key, PreparedStKey}). + +%% the spec of application:get_env(App, Key, Def) requires an atom() Key +statement_key(super_query) -> statement_super_query; +statement_key("super_query") -> statement_super_query; +statement_key(auth_query) -> statement_auth_query; +statement_key("auth_query") -> statement_auth_query; +statement_key(acl_query) -> statement_acl_query; +statement_key("acl_query") -> statement_acl_query; +statement_key(PreparedStKey) -> + throw({unsupported_prepared_statement_key, PreparedStKey}). diff --git a/apps/emqx_auth_pgsql/test/emqx_auth_pgsql_SUITE.erl b/apps/emqx_auth_pgsql/test/emqx_auth_pgsql_SUITE.erl index ef73e08c5..c9447eddc 100644 --- a/apps/emqx_auth_pgsql/test/emqx_auth_pgsql_SUITE.erl +++ b/apps/emqx_auth_pgsql/test/emqx_auth_pgsql_SUITE.erl @@ -19,7 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). --define(POOL, emqx_auth_pgsql). +-define(POOL, emqx_auth_pgsql_test_suite). -define(APP, emqx_auth_pgsql). @@ -67,6 +67,12 @@ (6, false, 'bcrypt_foo', '$2a$12$sSS8Eg.ovVzaHzi1nUHYK.HbUIOdlQI0iS22Q5rd5z.JVVYH6sfm6', '$2a$12$sSS8Eg.ovVzaHzi1nUHYK.'), (7, false, 'bcrypt', '$2y$16$rEVsDarhgHYB0TGnDFJzyu5f.T.Ha9iXMTk9J36NCMWWM7O16qyaK', 'salt')"). +-define(OS_ENVS(SERVER, USER, PASSWD, DB), [SERVER, USER, PASSWD, DB]). +-define(CMD,"EMQX_AUTH__PGSQL__SERVER=127.0.0.1:5432 " + "EMQX_AUTH__PGSQL__USERNAME=root " + "EMQX_AUTH__PGSQL__PASSWORD=public " + "EMQX_AUTH__PGSQL__DATABASE=mqtt " + "./rebar3 ct --name 'test@127.0.0.1' -v --suite apps/emqx_auth_pgsql/test/emqx_auth_pgsql_SUITE"). all() -> emqx_ct:all(?MODULE). @@ -74,14 +80,71 @@ suite() -> [{timetrap, {seconds, 120}}]. init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx_auth_pgsql]), + {ok, _} = application:ensure_all_started(ecpool), + case get_params_from_os_env(?OS_ENVS( + "EMQX_AUTH__PGSQL__SERVER", + "EMQX_AUTH__PGSQL__USERNAME", + "EMQX_AUTH__PGSQL__PASSWORD", + "EMQX_AUTH__PGSQL__DATABASE")) of + {error, Reason} -> + ct:print("You could start your test as: "?CMD), + {skip, Reason}; + {ok, ?OS_ENVS(Server, Username, Password, Database)} -> + do_init_per_suite(Server, Username, Password, Database, Config) + end. + +get_params_from_os_env(Keys) -> + get_params_from_os_env(Keys, []). + +get_params_from_os_env([], Acc) -> + {ok, lists:reverse(Acc)}; +get_params_from_os_env([Key | Keys], Acc) -> + case os:getenv(Key) of + false -> {error, iolist_to_binary(io_lib:format("OS Env ~s is undefined", [Key]))}; + Val -> get_params_from_os_env(Keys, [Val | Acc]) + end. + +do_init_per_suite(Server, Username, Password, Database, Config) -> + {HostS, Port} = case string:split(Server, ":", trailing) of + [Domain] -> {string:trim(Domain), 5432}; + [Domain, PortS] -> {string:trim(Domain), list_to_integer(string:trim(PortS))} + end, + Host = case inet:parse_address(HostS) of + {ok, IpAddr} -> IpAddr; + _ -> HostS + end, + Opts = [{host, Host}, + {port, Port}, + {username, Username}, + {password, Password}, + {database, Database}, + {timeout, 5000}, + {auto_reconnect, 15}, + {encoding, utf8}, + {pool_type, random}, + {pool_size, 8}], + case ecpool:start_sup_pool(?POOL, ?MODULE, Opts) of + {ok, _} -> ok; + {error, Reason} -> + {error, Reason} + end, drop_acl(), drop_auth(), init_auth(), init_acl(), + emqx_ct_helpers:start_apps([emqx_auth_pgsql]), set_special_configs(), Config. +connect(Opts) -> + Host = proplists:get_value(host, Opts), + Username = proplists:get_value(username, Opts), + Password = proplists:get_value(password, Opts), + case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of + {ok, C} -> {ok, C}; + {error, Reason} -> {error, Reason} + end. + end_per_suite(Config) -> emqx_ct_helpers:stop_apps([emqx_auth_pgsql]), Config. @@ -204,21 +267,42 @@ reload(Config) when is_list(Config) -> application:start(?APP). init_acl() -> - {ok, Pid} = ecpool_worker:client(gproc_pool:pick_worker({ecpool, ?POOL})), - {ok, [], []} = epgsql:squery(Pid, ?DROP_ACL_TABLE), - {ok, [], []} = epgsql:squery(Pid, ?CREATE_ACL_TABLE), - {ok, _} = epgsql:equery(Pid, ?INIT_ACL). + ecpool:with_client(?POOL, fun(Pid) -> + {ok, [], []} = epgsql:squery(Pid, ?DROP_ACL_TABLE), + {ok, [], []} = epgsql:squery(Pid, ?CREATE_ACL_TABLE), + {ok, _} = epgsql:equery(Pid, ?INIT_ACL) + end). drop_acl() -> - {ok, Pid} = ecpool_worker:client(gproc_pool:pick_worker({ecpool, ?POOL})), - {ok, [], []}= epgsql:squery(Pid, ?DROP_ACL_TABLE). + ecpool:with_client(?POOL, fun(Pid) -> + {ok, [], []}= epgsql:squery(Pid, ?DROP_ACL_TABLE) + end). init_auth() -> - {ok, Pid} = ecpool_worker:client(gproc_pool:pick_worker({ecpool, ?POOL})), - {ok, [], []} = epgsql:squery(Pid, ?DROP_AUTH_TABLE), - {ok, [], []} = epgsql:squery(Pid, ?CREATE_AUTH_TABLE), - {ok, _} = epgsql:equery(Pid, ?INIT_AUTH). + ecpool:with_client(?POOL, fun(Pid) -> + {ok, [], []} = epgsql:squery(Pid, ?DROP_AUTH_TABLE), + {ok, [], []} = epgsql:squery(Pid, ?CREATE_AUTH_TABLE), + {ok, _} = epgsql:equery(Pid, ?INIT_AUTH) + end). drop_auth() -> - {ok, Pid} = ecpool_worker:client(gproc_pool:pick_worker({ecpool, ?POOL})), - {ok, [], []} = epgsql:squery(Pid, ?DROP_AUTH_TABLE). + ecpool:with_client(?POOL, fun(Pid) -> + {ok, [], []} = epgsql:squery(Pid, ?DROP_AUTH_TABLE) + end). + +conn_opts(Opts) -> + conn_opts(Opts, []). +conn_opts([], Acc) -> + Acc; +conn_opts([Opt = {database, _}|Opts], Acc) -> + conn_opts(Opts, [Opt|Acc]); +conn_opts([Opt = {ssl, _}|Opts], Acc) -> + conn_opts(Opts, [Opt|Acc]); +conn_opts([Opt = {port, _}|Opts], Acc) -> + conn_opts(Opts, [Opt|Acc]); +conn_opts([Opt = {timeout, _}|Opts], Acc) -> + conn_opts(Opts, [Opt|Acc]); +conn_opts([Opt = {ssl_opts, _}|Opts], Acc) -> + conn_opts(Opts, [Opt|Acc]); +conn_opts([_Opt|Opts], Acc) -> + conn_opts(Opts, Acc).