feat(auth_mongo): use `with_timeout` for `query`
This commit is contained in:
parent
90995069c6
commit
f94c5ee40a
|
@ -54,7 +54,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
|
||||||
case query(Pool, Collection, maps:from_list(replvars(Selector, ClientInfo))) of
|
case query(Pool, Collection, maps:from_list(replvars(Selector, ClientInfo))) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?tp(emqx_auth_mongo_superuser_check_authn_error, #{error => Reason}),
|
?tp(emqx_auth_mongo_check_authn_error, #{error => Reason}),
|
||||||
?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
|
?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
|
||||||
{stop, AuthResult#{auth_result => not_authorized, anonymous => false}};
|
{stop, AuthResult#{auth_result => not_authorized, anonymous => false}};
|
||||||
UserMap ->
|
UserMap ->
|
||||||
|
@ -131,6 +131,7 @@ available(Pool, Collection, Query) ->
|
||||||
available(Pool, Collection, Query, Fun) ->
|
available(Pool, Collection, Query, Fun) ->
|
||||||
try Fun(Pool, Collection, Query) of
|
try Fun(Pool, Collection, Query) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
?tp(emqx_auth_mongo_available_error, #{error => Reason}),
|
||||||
?LOG(error, "[MongoDB] ~p availability test error: ~0p", [Collection, Reason]),
|
?LOG(error, "[MongoDB] ~p availability test error: ~0p", [Collection, Reason]),
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
Error = #{<<"code">> := Code} ->
|
Error = #{<<"code">> := Code} ->
|
||||||
|
@ -195,12 +196,10 @@ connect(Opts) ->
|
||||||
mongo_api:connect(Type, Hosts, Options, WorkerOptions).
|
mongo_api:connect(Type, Hosts, Options, WorkerOptions).
|
||||||
|
|
||||||
query(Pool, Collection, Selector) ->
|
query(Pool, Collection, Selector) ->
|
||||||
try
|
Timeout = timer:seconds(15),
|
||||||
|
with_timeout(Timeout, fun() ->
|
||||||
ecpool:with_client(Pool, fun(Conn) -> mongo_api:find_one(Conn, Collection, Selector, #{}) end)
|
ecpool:with_client(Pool, fun(Conn) -> mongo_api:find_one(Conn, Collection, Selector, #{}) end)
|
||||||
catch
|
end).
|
||||||
Err:Reason ->
|
|
||||||
{error, {Err, Reason}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
query_multi(Pool, Collection, SelectorList) ->
|
query_multi(Pool, Collection, SelectorList) ->
|
||||||
?tp(emqx_auth_mongo_query_multi_enter, #{}),
|
?tp(emqx_auth_mongo_query_multi_enter, #{}),
|
||||||
|
|
|
@ -94,10 +94,11 @@ all() ->
|
||||||
| OtherTCs].
|
| OtherTCs].
|
||||||
|
|
||||||
resilience_tests() ->
|
resilience_tests() ->
|
||||||
[ t_acl_superuser_no_connection
|
[ t_acl_superuser_timeout
|
||||||
, t_available_acl_query_no_connection
|
, t_available_acl_query_no_connection
|
||||||
, t_available_acl_query_timeout
|
, t_available_acl_query_timeout
|
||||||
, t_authn_no_connection
|
, t_available_authn_query_timeout
|
||||||
|
, t_authn_timeout
|
||||||
, t_available
|
, t_available
|
||||||
].
|
].
|
||||||
|
|
||||||
|
@ -183,7 +184,7 @@ end_per_testcase(t_authn_full_selector_variables, Config) ->
|
||||||
ok;
|
ok;
|
||||||
end_per_testcase(TestCase, Config)
|
end_per_testcase(TestCase, Config)
|
||||||
when TestCase =:= t_available_acl_query_timeout;
|
when TestCase =:= t_available_acl_query_timeout;
|
||||||
TestCase =:= t_acl_superuser_no_connection;
|
TestCase =:= t_acl_superuser_timeout;
|
||||||
TestCase =:= t_authn_no_connection;
|
TestCase =:= t_authn_no_connection;
|
||||||
TestCase =:= t_available_acl_query_no_connection ->
|
TestCase =:= t_available_acl_query_no_connection ->
|
||||||
ProxyHost = ?config(proxy_host, Config),
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
@ -441,10 +442,10 @@ t_is_superuser_undefined(_Config) ->
|
||||||
?assertNot(emqx_auth_mongo:is_superuser(Pool, SuperQuery, ClientInfo)),
|
?assertNot(emqx_auth_mongo:is_superuser(Pool, SuperQuery, ClientInfo)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_authn_no_connection(Config) ->
|
t_authn_timeout(Config) ->
|
||||||
ProxyHost = ?config(proxy_host, Config),
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
FailureType = down,
|
FailureType = timeout,
|
||||||
{ok, C} = emqtt:start_link([{clientid, <<"simpleClient">>},
|
{ok, C} = emqtt:start_link([{clientid, <<"simpleClient">>},
|
||||||
{username, <<"plain">>},
|
{username, <<"plain">>},
|
||||||
{password, <<"plain">>}]),
|
{password, <<"plain">>}]),
|
||||||
|
@ -460,12 +461,38 @@ t_authn_no_connection(Config) ->
|
||||||
end,
|
end,
|
||||||
fun(Trace) ->
|
fun(Trace) ->
|
||||||
%% fails with `{exit,{{{badmatch,{{error,closed},...'
|
%% fails with `{exit,{{{badmatch,{{error,closed},...'
|
||||||
?assertMatch([_], ?of_kind(emqx_auth_mongo_superuser_check_authn_error, Trace)),
|
?assertMatch([_], ?of_kind(emqx_auth_mongo_check_authn_error, Trace)),
|
||||||
ok
|
ok
|
||||||
end),
|
end),
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% tests query timeout failure
|
||||||
|
t_available_authn_query_timeout(Config) ->
|
||||||
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
FailureType = timeout,
|
||||||
|
SuperQuery = superquery(),
|
||||||
|
|
||||||
|
?check_trace(
|
||||||
|
#{timetrap => timer:seconds(60)},
|
||||||
|
try
|
||||||
|
enable_failure(FailureType, ProxyHost, ProxyPort),
|
||||||
|
Pool = ?APP,
|
||||||
|
%% query_multi returns an empty list even on failures.
|
||||||
|
?assertEqual({error, timeout}, emqx_auth_mongo:available(Pool, SuperQuery)),
|
||||||
|
ok
|
||||||
|
after
|
||||||
|
heal_failure(FailureType, ProxyHost, ProxyPort)
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertMatch(
|
||||||
|
[#{?snk_kind := emqx_auth_mongo_available_error , error := _}],
|
||||||
|
?of_kind(emqx_auth_mongo_available_error, Trace))
|
||||||
|
end),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
%% tests query_multi failure
|
%% tests query_multi failure
|
||||||
t_available_acl_query_no_connection(Config) ->
|
t_available_acl_query_no_connection(Config) ->
|
||||||
test_acl_query_failure(down, Config).
|
test_acl_query_failure(down, Config).
|
||||||
|
@ -488,10 +515,10 @@ t_query_multi_unknown_exception(_Config) ->
|
||||||
meck:unload(ecpool)
|
meck:unload(ecpool)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
t_acl_superuser_no_connection(Config) ->
|
t_acl_superuser_timeout(Config) ->
|
||||||
ProxyHost = ?config(proxy_host, Config),
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
FailureType = down,
|
FailureType = timeout,
|
||||||
reload({auth_query, [{password_hash, plain}, {password_field, [<<"password">>]}]}),
|
reload({auth_query, [{password_hash, plain}, {password_field, [<<"password">>]}]}),
|
||||||
{ok, C} = emqtt:start_link([{clientid, <<"simpleClient">>},
|
{ok, C} = emqtt:start_link([{clientid, <<"simpleClient">>},
|
||||||
{username, <<"plain">>},
|
{username, <<"plain">>},
|
||||||
|
|
Loading…
Reference in New Issue