feat(mongo): add timeouts and more tests
This commit is contained in:
parent
83fb479311
commit
3f02c6b574
|
@ -203,16 +203,51 @@ query(Pool, Collection, Selector) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
query_multi(Pool, Collection, SelectorList) ->
|
query_multi(Pool, Collection, SelectorList) ->
|
||||||
|
?tp(emqx_auth_mongo_query_multi_enter, #{}),
|
||||||
|
Timeout = timer:seconds(45),
|
||||||
lists:reverse(lists:flatten(lists:foldl(fun(Selector, Acc1) ->
|
lists:reverse(lists:flatten(lists:foldl(fun(Selector, Acc1) ->
|
||||||
Batch = ecpool:with_client(Pool, fun(Conn) ->
|
Res =
|
||||||
case mongo_api:find(Conn, Collection, Selector, #{}) of
|
with_timeout(Timeout, fun() ->
|
||||||
{error, Reason} ->
|
ecpool:with_client(Pool, fun(Conn) ->
|
||||||
?LOG(error, "[MongoDB] query_multi failed, got error: ~p", [Reason]),
|
?tp(emqx_auth_mongo_query_multi_find_selector, #{}),
|
||||||
[];
|
case find(Conn, Collection, Selector) of
|
||||||
[] -> [];
|
{error, Reason} ->
|
||||||
{ok, Cursor} ->
|
?tp(emqx_auth_mongo_query_multi_error,
|
||||||
mc_cursor:foldl(fun(O, Acc2) -> [O|Acc2] end, [], Cursor, 1000)
|
#{error => Reason}),
|
||||||
end
|
?LOG(error, "[MongoDB] query_multi failed, got error: ~p", [Reason]),
|
||||||
end),
|
[];
|
||||||
[Batch|Acc1]
|
[] ->
|
||||||
|
?tp(emqx_auth_mongo_query_multi_no_results, #{}),
|
||||||
|
[];
|
||||||
|
{ok, Cursor} ->
|
||||||
|
mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end),
|
||||||
|
case Res of
|
||||||
|
{error, timeout} ->
|
||||||
|
?tp(emqx_auth_mongo_query_multi_error, #{error => timeout}),
|
||||||
|
?LOG(error, "[MongoDB] query_multi timeout", []),
|
||||||
|
Acc1;
|
||||||
|
Batch ->
|
||||||
|
[Batch | Acc1]
|
||||||
|
end
|
||||||
end, [], SelectorList))).
|
end, [], SelectorList))).
|
||||||
|
|
||||||
|
find(Conn, Collection, Selector) ->
|
||||||
|
try
|
||||||
|
mongo_api:find(Conn, Collection, Selector, #{})
|
||||||
|
catch
|
||||||
|
K:E:S ->
|
||||||
|
{error, {K, E, S}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
with_timeout(Timeout, Fun) ->
|
||||||
|
try
|
||||||
|
emqx_misc:nolink_apply(Fun, Timeout)
|
||||||
|
catch
|
||||||
|
exit:timeout ->
|
||||||
|
{error, timeout};
|
||||||
|
K:E:S ->
|
||||||
|
erlang:raise(K, E, S)
|
||||||
|
end.
|
||||||
|
|
|
@ -95,6 +95,8 @@ all() ->
|
||||||
|
|
||||||
resilience_tests() ->
|
resilience_tests() ->
|
||||||
[ t_acl_superuser_no_connection
|
[ t_acl_superuser_no_connection
|
||||||
|
, t_available_acl_query_no_connection
|
||||||
|
, t_available_acl_query_timeout
|
||||||
, t_authn_no_connection
|
, t_authn_no_connection
|
||||||
, t_available
|
, t_available
|
||||||
].
|
].
|
||||||
|
@ -105,7 +107,6 @@ groups() ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_ct_helpers:start_apps([emqx_auth_mongo], fun set_special_confs/1),
|
emqx_ct_helpers:start_apps([emqx_auth_mongo], fun set_special_confs/1),
|
||||||
init_mongo_data(),
|
|
||||||
%% avoid inter-suite flakiness
|
%% avoid inter-suite flakiness
|
||||||
ok = emqx_mod_acl_internal:unload([]),
|
ok = emqx_mod_acl_internal:unload([]),
|
||||||
Config.
|
Config.
|
||||||
|
@ -167,17 +168,27 @@ init_per_testcase(t_authn_full_selector_variables, Config) ->
|
||||||
, {<<"distinguished_name">>, <<"%d">>}
|
, {<<"distinguished_name">>, <<"%d">>}
|
||||||
],
|
],
|
||||||
reload({auth_query, [{selector, Selector}]}),
|
reload({auth_query, [{selector, Selector}]}),
|
||||||
|
init_mongo_data(),
|
||||||
[ {original_selector, OriginalSelector}
|
[ {original_selector, OriginalSelector}
|
||||||
, {selector, Selector}
|
, {selector, Selector}
|
||||||
| Config];
|
| Config];
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
init_mongo_data(),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(t_authn_full_selector_variables, Config) ->
|
end_per_testcase(t_authn_full_selector_variables, Config) ->
|
||||||
OriginalSelector = ?config(original_selector, Config),
|
OriginalSelector = ?config(original_selector, Config),
|
||||||
reload({auth_query, [{selector, OriginalSelector}]}),
|
reload({auth_query, [{selector, OriginalSelector}]}),
|
||||||
|
deinit_mongo_data(),
|
||||||
|
ok;
|
||||||
|
end_per_testcase(t_available_acl_query_timeout, Config) ->
|
||||||
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
reset_proxy(ProxyHost, ProxyPort),
|
||||||
|
deinit_mongo_data(),
|
||||||
ok;
|
ok;
|
||||||
end_per_testcase(_TestCase, _Config) ->
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
|
deinit_mongo_data(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_mongo_data() ->
|
init_mongo_data() ->
|
||||||
|
@ -198,6 +209,10 @@ deinit_mongo_data() ->
|
||||||
%% Test cases
|
%% Test cases
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% for full coverage ;-)
|
||||||
|
t_description(_Config) ->
|
||||||
|
?assert(is_list(emqx_auth_mongo:description())).
|
||||||
|
|
||||||
t_check_auth(_) ->
|
t_check_auth(_) ->
|
||||||
Plain = #{zone => external, clientid => <<"client1">>, username => <<"plain">>},
|
Plain = #{zone => external, clientid => <<"client1">>, username => <<"plain">>},
|
||||||
Plain1 = #{zone => external, clientid => <<"client1">>, username => <<"plain2">>},
|
Plain1 = #{zone => external, clientid => <<"client1">>, username => <<"plain2">>},
|
||||||
|
@ -325,8 +340,6 @@ t_available(Config) ->
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
{error, {mongo_error, 2}},
|
{error, {mongo_error, 2}},
|
||||||
emqx_auth_mongo:available(Pool, SuperCollection, EmptySelector, fun error_code_query/3)),
|
emqx_auth_mongo:available(Pool, SuperCollection, EmptySelector, fun error_code_query/3)),
|
||||||
%% some error;
|
|
||||||
todo,
|
|
||||||
%% exception.
|
%% exception.
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{error, _},
|
{error, _},
|
||||||
|
@ -408,6 +421,15 @@ t_authn_no_connection(Config) ->
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% tests query_multi failure
|
||||||
|
t_available_acl_query_no_connection(Config) ->
|
||||||
|
test_acl_query_failure(down, Config).
|
||||||
|
|
||||||
|
%% ensure query_multi has a timeout
|
||||||
|
t_available_acl_query_timeout(Config) ->
|
||||||
|
ct:timetrap(90000),
|
||||||
|
test_acl_query_failure(timeout, Config).
|
||||||
|
|
||||||
t_acl_superuser_no_connection(Config) ->
|
t_acl_superuser_no_connection(Config) ->
|
||||||
ProxyHost = ?config(proxy_host, Config),
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
@ -460,6 +482,41 @@ t_acl_superuser_no_connection(Config) ->
|
||||||
%% Utils
|
%% Utils
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
test_acl_query_failure(FailureType, Config) ->
|
||||||
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
ACLQuery = aclquery(),
|
||||||
|
|
||||||
|
?check_trace(
|
||||||
|
try
|
||||||
|
?force_ordering(
|
||||||
|
#{?snk_kind := emqx_auth_mongo_query_multi_enter},
|
||||||
|
#{?snk_kind := connection_will_cut}
|
||||||
|
),
|
||||||
|
?force_ordering(
|
||||||
|
#{?snk_kind := connection_cut},
|
||||||
|
#{?snk_kind := emqx_auth_mongo_query_multi_find_selector}
|
||||||
|
),
|
||||||
|
spawn(fun() ->
|
||||||
|
?tp(connection_will_cut, #{}),
|
||||||
|
enable_failure(FailureType, ProxyHost, ProxyPort),
|
||||||
|
?tp(connection_cut, #{})
|
||||||
|
end),
|
||||||
|
Pool = ?APP,
|
||||||
|
%% query_multi returns an empty list even on failures.
|
||||||
|
?assertMatch(ok, emqx_auth_mongo:available(Pool, ACLQuery)),
|
||||||
|
ok
|
||||||
|
after
|
||||||
|
heal_failure(FailureType, ProxyHost, ProxyPort)
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertMatch(
|
||||||
|
[#{?snk_kind := emqx_auth_mongo_query_multi_error , error := _}],
|
||||||
|
?of_kind(emqx_auth_mongo_query_multi_error, Trace))
|
||||||
|
end),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
reload({Par, Vals}) when is_list(Vals) ->
|
reload({Par, Vals}) when is_list(Vals) ->
|
||||||
application:stop(?APP),
|
application:stop(?APP),
|
||||||
{ok, TupleVals} = application:get_env(?APP, Par),
|
{ok, TupleVals} = application:get_env(?APP, Par),
|
||||||
|
@ -476,6 +533,9 @@ reload({Par, Vals}) when is_list(Vals) ->
|
||||||
superquery() ->
|
superquery() ->
|
||||||
emqx_auth_mongo_app:with_env(super_query, fun(SQ) -> SQ end).
|
emqx_auth_mongo_app:with_env(super_query, fun(SQ) -> SQ end).
|
||||||
|
|
||||||
|
aclquery() ->
|
||||||
|
emqx_auth_mongo_app:with_env(acl_query, fun(SQ) -> SQ end).
|
||||||
|
|
||||||
%% TODO: any easier way to make mongo return a map with an error code???
|
%% TODO: any easier way to make mongo return a map with an error code???
|
||||||
error_code_query(Pool, Collection, Selector) ->
|
error_code_query(Pool, Collection, Selector) ->
|
||||||
%% should be a query; this is to provoke an error return from
|
%% should be a query; this is to provoke an error return from
|
||||||
|
|
Loading…
Reference in New Issue