diff --git a/.ci/docker-compose-file/conf.env b/.ci/docker-compose-file/conf.env index 93dfecd2b..0141a6b4b 100644 --- a/.ci/docker-compose-file/conf.env +++ b/.ci/docker-compose-file/conf.env @@ -1,5 +1,5 @@ EMQX_AUTH__LDAP__SERVERS=ldap_server -EMQX_AUTH__MONGO__SERVER=mongo_server:27017 +EMQX_AUTH__MONGO__SERVER=toxiproxy:27017 EMQX_AUTH__MYSQL__SERVER=mysql_server:3306 EMQX_AUTH__MYSQL__USERNAME=root EMQX_AUTH__MYSQL__PASSWORD=public diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml new file mode 100644 index 000000000..005ac40d0 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -0,0 +1,16 @@ +version: '3.9' + +services: + toxiproxy: + container_name: toxiproxy + image: ghcr.io/shopify/toxiproxy:2.5.0 + restart: always + networks: + - emqx_bridge + volumes: + - "./toxiproxy.json:/config/toxiproxy.json" + ports: + - 8474:8474 + command: + - "-host=0.0.0.0" + - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json new file mode 100644 index 000000000..7079b0599 --- /dev/null +++ b/.ci/docker-compose-file/toxiproxy.json @@ -0,0 +1,8 @@ +[ + { + "name": "mongo", + "listen": "0.0.0.0:27017", + "upstream": "mongo:27017", + "enabled": true + } +] diff --git a/.github/workflows/run_cts_tests.yaml b/.github/workflows/run_cts_tests.yaml index 6b05a014e..4b2f2fd0b 100644 --- a/.github/workflows/run_cts_tests.yaml +++ b/.github/workflows/run_cts_tests.yaml @@ -82,6 +82,7 @@ jobs: - name: docker-compose up run: | docker-compose \ + -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-${{ matrix.connect_type }}.yaml \ -f .ci/docker-compose-file/docker-compose.yaml \ up -d --build diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 04391f06c..6d7d0d715 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -54,6 +54,7 @@ jobs: run: | docker-compose \ -f .ci/docker-compose-file/docker-compose.yaml \ + -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ -f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \ @@ -81,6 +82,7 @@ jobs: run: | docker-compose \ -f .ci/docker-compose-file/docker-compose.yaml \ + -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ -f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \ diff --git a/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl b/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl index f7071bc17..8529fb143 100644 --- a/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl +++ b/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl @@ -408,7 +408,7 @@ t_password_hash(_) -> ok = application:start(emqx_auth_mnesia). t_will_message_connection_denied(Config) when is_list(Config) -> - ClientId = Username = <<"subscriber">>, + ClientId = <<"subscriber">>, Password = <<"p">>, application:stop(emqx_auth_mnesia), ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia]), diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl b/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl index bfb911707..1c9d1e879 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl @@ -22,6 +22,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([ check/3 , description/0 @@ -38,14 +39,22 @@ , available/3 ]). +-ifdef(TEST). +-export([ is_superuser/3 + , available/4 + ]). +-endif. + check(ClientInfo = #{password := Password}, AuthResult, Env = #{authquery := AuthQuery, superquery := SuperQuery}) -> + ?tp(emqx_auth_mongo_superuser_check_authn_enter, #{}), #authquery{collection = Collection, field = Fields, hash = HashType, selector = Selector} = AuthQuery, Pool = maps:get(pool, Env, ?APP), case query(Pool, Collection, maps:from_list(replvars(Selector, ClientInfo))) of undefined -> ok; {error, Reason} -> + ?tp(emqx_auth_mongo_superuser_check_authn_error, #{error => Reason}), ?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]), {stop, AuthResult#{auth_result => not_authorized, anonymous => false}}; UserMap -> @@ -58,6 +67,7 @@ check(ClientInfo = #{password := Password}, AuthResult, end, case Result of ok -> + ?tp(emqx_auth_mongo_superuser_check_authn_ok, #{}), {stop, AuthResult#{is_superuser => is_superuser(Pool, SuperQuery, ClientInfo), anonymous => false, auth_result => success}}; @@ -81,17 +91,24 @@ description() -> "Authentication with MongoDB". is_superuser(_Pool, undefined, _ClientInfo) -> false; is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Selector}, ClientInfo) -> - case query(Pool, Coll, maps:from_list(replvars(Selector, ClientInfo))) of - undefined -> false; - {error, Reason} -> - ?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]), - false; - Row -> - case maps:get(Field, Row, false) of - true -> true; - _False -> false - end - end. + ?tp(emqx_auth_mongo_superuser_query_enter, #{}), + Res = + case query(Pool, Coll, maps:from_list(replvars(Selector, ClientInfo))) of + undefined -> + %% returned when there are no returned documents + false; + {error, Reason} -> + ?tp(emqx_auth_mongo_superuser_query_error, #{error => Reason}), + ?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]), + false; + Row -> + case maps:get(Field, Row, false) of + true -> true; + _False -> false + end + end, + ?tp(emqx_auth_mongo_superuser_query_result, #{is_superuser => Res}), + Res. %%-------------------------------------------------------------------- %% Availability Test @@ -169,7 +186,12 @@ connect(Opts) -> mongo_api:connect(Type, Hosts, Options, WorkerOptions). query(Pool, Collection, Selector) -> - ecpool:with_client(Pool, fun(Conn) -> mongo_api:find_one(Conn, Collection, Selector, #{}) end). + try + ecpool:with_client(Pool, fun(Conn) -> mongo_api:find_one(Conn, Collection, Selector, #{}) end) + catch + Err:Reason -> + {error, {Err, Reason}} + end. query_multi(Pool, Collection, SelectorList) -> lists:reverse(lists:flatten(lists:foldl(fun(Selector, Acc1) -> diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo_app.erl b/apps/emqx_auth_mongo/src/emqx_auth_mongo_app.erl index a63aa8193..ed8b68a68 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo_app.erl +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo_app.erl @@ -30,6 +30,10 @@ , stop/1 ]). +-ifdef(TEST). +-export([with_env/2]). +-endif. + %%-------------------------------------------------------------------- %% Application callbacks %%-------------------------------------------------------------------- diff --git a/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE.erl b/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE.erl index 66f5253d0..6e85a66a2 100644 --- a/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE.erl +++ b/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE.erl @@ -19,42 +19,96 @@ -compile(export_all). -compile(nowarn_export_all). +-include("emqx_auth_mongo.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). - --define(APP, emqx_auth_mongo). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(POOL(App), ecpool_worker:client(gproc_pool:pick_worker({ecpool, App}))). -define(MONGO_CL_ACL, <<"mqtt_acl">>). -define(MONGO_CL_USER, <<"mqtt_user">>). --define(INIT_ACL, [{<<"username">>, <<"testuser">>, <<"clientid">>, <<"null">>, <<"subscribe">>, [<<"#">>]}, - {<<"username">>, <<"dashboard">>, <<"clientid">>, <<"null">>, <<"pubsub">>, [<<"$SYS/#">>]}, - {<<"username">>, <<"user3">>, <<"clientid">>, <<"null">>, <<"publish">>, [<<"a/b/c">>]}]). +-define(INIT_ACL, [ { <<"username">>, <<"testuser">> + , <<"clientid">>, <<"null">> + , <<"subscribe">>, [<<"#">>] + } + , { <<"username">>, <<"dashboard">> + , <<"clientid">>, <<"null">> + , <<"pubsub">>, [<<"$SYS/#">>] + } + , { <<"username">>, <<"user3">> + , <<"clientid">>, <<"null">> + , <<"publish">>, [<<"a/b/c">>] + } + ]). --define(INIT_AUTH, [{<<"username">>, <<"plain">>, <<"password">>, <<"plain">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, true}, - {<<"username">>, <<"md5">>, <<"password">>, <<"1bc29b36f623ba82aaf6724fd3b16718">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, false}, - {<<"username">>, <<"sha">>, <<"password">>, <<"d8f4590320e1343a915b6394170650a8f35d6926">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, false}, - {<<"username">>, <<"sha256">>, <<"password">>, <<"5d5b09f6dcb2d53a5fffc60c4ac0d55fabdf556069d6631545f42aa6e3500f2e">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, false}, - {<<"username">>, <<"pbkdf2_password">>, <<"password">>, <<"cdedb5281bb2f801565a1122b2563515">>, <<"salt">>, <<"ATHENA.MIT.EDUraeburn">>, <<"is_superuser">>, false}, - {<<"username">>, <<"bcrypt_foo">>, <<"password">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.HbUIOdlQI0iS22Q5rd5z.JVVYH6sfm6">>, <<"salt">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.">>, <<"is_superuser">>, false} - ]). +-define(INIT_AUTH, [ { <<"username">>, <<"plain">> + , <<"password">>, <<"plain">> + , <<"salt">>, <<"salt">> + , <<"is_superuser">>, true + } + , { <<"username">>, <<"md5">> + , <<"password">>, <<"1bc29b36f623ba82aaf6724fd3b16718">> + , <<"salt">>, <<"salt">> + , <<"is_superuser">>, false + } + , { <<"username">>, <<"sha">> + , <<"password">>, <<"d8f4590320e1343a915b6394170650a8f35d6926">> + , <<"salt">>, <<"salt">> + , <<"is_superuser">>, false + } + , { <<"username">>, <<"sha256">> + , <<"password">>, <<"5d5b09f6dcb2d53a5fffc60c4ac0d55fabdf556069d6631545f42aa6e3500f2e">> + , <<"salt">>, <<"salt">> + , <<"is_superuser">>, false + } + , { <<"username">>, <<"pbkdf2_password">> + , <<"password">>, <<"cdedb5281bb2f801565a1122b2563515">> + , <<"salt">>, <<"ATHENA.MIT.EDUraeburn">> + , <<"is_superuser">>, false + } + , { <<"username">>, <<"bcrypt_foo">> + , <<"password">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.HbUIOdlQI0iS22Q5rd5z.JVVYH6sfm6">> + , <<"salt">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.">> + , <<"is_superuser">>, false + } + , { <<"username">>, <<"user_full">> + , <<"clientid">>, <<"client_full">> + , <<"common_name">>, <<"cn_full">> + , <<"distinguished_name">>, <<"dn_full">> + , <<"password">>, <<"plain">> + , <<"salt">>, <<"salt">> + , <<"is_superuser">>, false + } + ]). %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- all() -> - emqx_ct:all(?MODULE). + OtherTCs = emqx_ct:all(?MODULE) -- resilience_tests(), + [ {group, resilience} + | OtherTCs]. -init_per_suite(Cfg) -> +resilience_tests() -> + [ t_acl_superuser_no_connection + , t_authn_no_connection + , t_available + ]. + +groups() -> + [ {resilience, resilience_tests()} + ]. + +init_per_suite(Config) -> emqx_ct_helpers:start_apps([emqx_auth_mongo], fun set_special_confs/1), init_mongo_data(), %% avoid inter-suite flakiness ok = emqx_mod_acl_internal:unload([]), - Cfg. + Config. end_per_suite(_Cfg) -> deinit_mongo_data(), @@ -69,6 +123,63 @@ set_special_confs(emqx) -> set_special_confs(_App) -> ok. +init_per_group(resilience, Config) -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPortStr = os:getenv("PROXY_PORT", "8474"), + ProxyPort = list_to_integer(ProxyPortStr), + reset_proxy(ProxyHost, ProxyPort), + ProxyServer = ProxyHost ++ ":27017", + {ok, OriginalServer} = application:get_env(emqx_auth_mongo, server), + OriginalServerMap = maps:from_list(OriginalServer), + NewServerMap = OriginalServerMap#{hosts => [ProxyServer]}, + NewServer = maps:to_list(NewServerMap), + emqx_ct_helpers:stop_apps([emqx_auth_mongo]), + Handler = + fun(App = emqx_auth_mongo) -> + application:set_env(emqx_auth_mongo, server, NewServer), + set_special_confs(App); + (App)-> + set_special_confs(App) + end, + emqx_ct_helpers:start_apps([emqx_auth_mongo], Handler), + [ {original_server, OriginalServer} + , {proxy_host, ProxyHost} + , {proxy_port, ProxyPort} + | Config]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(resilience, Config) -> + OriginalServer = ?config(original_server, Config), + application:set_env(emqx_auth_mongo, server, OriginalServer), + emqx_ct_helpers:stop_apps([emqx_auth_mongo]), + emqx_ct_helpers:start_apps([emqx_auth_mongo], fun set_special_confs/1), + ok; +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(t_authn_full_selector_variables, Config) -> + {ok, AuthQuery} = application:get_env(emqx_auth_mongo, auth_query), + OriginalSelector = proplists:get_value(selector, AuthQuery), + Selector = [ {<<"username">>, <<"%u">>} + , {<<"clientid">>, <<"%c">>} + , {<<"common_name">>, <<"%C">>} + , {<<"distinguished_name">>, <<"%d">>} + ], + reload({auth_query, [{selector, Selector}]}), + [ {original_selector, OriginalSelector} + , {selector, Selector} + | Config]; +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(t_authn_full_selector_variables, Config) -> + OriginalSelector = ?config(original_selector, Config), + reload({auth_query, [{selector, OriginalSelector}]}), + ok; +end_per_testcase(_TestCase, _Config) -> + ok. + init_mongo_data() -> %% Users {ok, Connection} = ?POOL(?APP), @@ -116,7 +227,96 @@ t_check_auth(_) -> {ok, #{is_superuser := false}} = emqx_access_control:authenticate(Pbkdf2#{password => <<"password">>}), reload({auth_query, [{password_hash, {salt, bcrypt}}]}), {ok, #{is_superuser := false}} = emqx_access_control:authenticate(Bcrypt#{password => <<"foo">>}), - {error, _} = emqx_access_control:authenticate(User1#{password => <<"foo">>}). + {error, _} = emqx_access_control:authenticate(User1#{password => <<"foo">>}), + %% bad field config + reload({auth_query, [{password_field, [<<"bad_field">>]}]}), + ?assertEqual({error, password_error}, + emqx_access_control:authenticate(Plain#{password => <<"plain">>})), + %% unknown username + Unknown = #{zone => unknown, clientid => <<"?">>, username => <<"?">>, password => <<"">>}, + ?assertEqual({error, not_authorized}, emqx_access_control:authenticate(Unknown)), + ok. + +t_authn_full_selector_variables(Config) -> + Selector = ?config(selector, Config), + ClientInfo = #{ zone => external + , clientid => <<"client_full">> + , username => <<"user_full">> + , cn => <<"cn_full">> + , dn => <<"dn_full">> + , password => <<"plain">> + }, + ?assertMatch({ok, _}, emqx_access_control:authenticate(ClientInfo)), + EnvFields = [ clientid + , username + , cn + , dn + ], + lists:foreach( + fun(Field) -> + UnauthorizedClientInfo = ClientInfo#{Field => <<"wrong">>}, + ?assertEqual({error, not_authorized}, + emqx_access_control:authenticate(UnauthorizedClientInfo), + #{ field => Field + , client_info => UnauthorizedClientInfo + , selector => Selector + }) + end, + EnvFields), + ok. + +%% authenticates, but superquery returns no documents +t_authn_empty_is_superuser_collection(_Config) -> + {ok, SuperQuery} = application:get_env(emqx_auth_mongo, super_query), + Collection = list_to_binary(proplists:get_value(collection, SuperQuery)), + reload({auth_query, [{password_hash, plain}]}), + Plain = #{zone => external, clientid => <<"client1">>, + username => <<"plain">>, password => <<"plain">>}, + ok = snabbkaffe:start_trace(), + ?force_ordering( + #{?snk_kind := emqx_auth_mongo_superuser_check_authn_ok}, + #{?snk_kind := truncate_coll_enter}), + ?force_ordering( + #{?snk_kind := truncate_coll_done}, + #{?snk_kind := emqx_auth_mongo_superuser_query_enter}), + try + spawn_link(fun() -> + ?tp(truncate_coll_enter, #{}), + {ok, Conn} = ?POOL(?APP), + {true, _} = mongo_api:delete(Conn, Collection, _Selector = #{}), + ?tp(truncate_coll_done, #{}) + end), + ?assertMatch({ok, #{is_superuser := false}}, emqx_access_control:authenticate(Plain)), + ok = snabbkaffe:stop(), + ok + after + init_mongo_data() + end. + +t_available(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + Pool = ?APP, + SuperQuery = #superquery{collection = SuperCollection} = superquery(), + %% success; + ?assertEqual(ok, emqx_auth_mongo:available(Pool, SuperQuery)), + %% error with code; + EmptySelector = #{}, + ?assertEqual( + {error, {mongo_error, 2}}, + emqx_auth_mongo:available(Pool, SuperCollection, EmptySelector, fun error_code_query/3)), + %% some error; + todo, + %% exception. + ?assertMatch( + {error, _}, + with_failure(down, ProxyHost, ProxyPort, + fun() -> + Collection = <<"mqtt_user">>, + Selector = #{}, + emqx_auth_mongo:available(Pool, Collection, Selector) + end)), + ok. t_check_acl(_) -> {ok, Connection} = ?POOL(?APP), @@ -155,6 +355,87 @@ t_acl_super(_) -> end, emqtt:disconnect(C). +%% apparently, if the config is undefined in `emqx_auth_mongo_app:r', +%% this is allowed... +t_is_superuser_undefined(_Config) -> + Pool = ClientInfo = unused_in_this_case, + SuperQuery = undefined, + ?assertNot(emqx_auth_mongo:is_superuser(Pool, SuperQuery, ClientInfo)), + ok. + +t_authn_no_connection(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + FailureType = down, + {ok, C} = emqtt:start_link([{clientid, <<"simpleClient">>}, + {username, <<"plain">>}, + {password, <<"plain">>}]), + unlink(C), + + ?check_trace( + try + enable_failure(FailureType, ProxyHost, ProxyPort), + {error, {unauthorized_client, _}} = emqtt:connect(C), + ok + after + heal_failure(FailureType, ProxyHost, ProxyPort) + end, + fun(Trace) -> + %% fails with `{exit,{{{badmatch,{{error,closed},...' + ?assertMatch([_], ?of_kind(emqx_auth_mongo_superuser_check_authn_error, Trace)), + ok + end), + + ok. + +t_acl_superuser_no_connection(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + FailureType = down, + reload({auth_query, [{password_hash, plain}, {password_field, [<<"password">>]}]}), + {ok, C} = emqtt:start_link([{clientid, <<"simpleClient">>}, + {username, <<"plain">>}, + {password, <<"plain">>}]), + unlink(C), + + ?check_trace( + try + ?force_ordering( + #{?snk_kind := emqx_auth_mongo_superuser_check_authn_ok}, + #{?snk_kind := connection_will_cut} + ), + ?force_ordering( + #{?snk_kind := connection_cut}, + #{?snk_kind := emqx_auth_mongo_superuser_query_enter} + ), + spawn(fun() -> + ?tp(connection_will_cut, #{}), + enable_failure(FailureType, ProxyHost, ProxyPort), + ?tp(connection_cut, #{}) + end), + + {ok, _} = emqtt:connect(C), + ok = emqtt:disconnect(C), + ok + after + heal_failure(FailureType, ProxyHost, ProxyPort) + end, + fun(Trace) -> + ?assertMatch( + [ #{ ?snk_kind := emqx_auth_mongo_superuser_query_error + , error := _ + } + , #{ ?snk_kind := emqx_auth_mongo_superuser_query_result + , is_superuser := false + } + ], + ?of_kind([ emqx_auth_mongo_superuser_query_error + , emqx_auth_mongo_superuser_query_result + ], Trace)) + end), + + ok. + %%-------------------------------------------------------------------- %% Utils %%-------------------------------------------------------------------- @@ -171,3 +452,87 @@ reload({Par, Vals}) when is_list(Vals) -> end, TupleVals), application:set_env(?APP, Par, lists:append(NewVals, Vals)), application:start(?APP). + +superquery() -> + emqx_auth_mongo_app:with_env(super_query, fun(SQ) -> SQ end). + +%% TODO: any easier way to make mongo return a map with an error code??? +error_code_query(Pool, Collection, Selector) -> + %% should be a query; this is to provoke an error return from + %% mongo. + WrongLimit = {}, + ecpool:with_client( + Pool, + fun(Conn) -> + mongoc:transaction_query( + Conn, + fun(Conf = #{pool := Worker}) -> + Query = mongoc:count_query(Conf, Collection, Selector, WrongLimit), + {_, Res} = mc_worker_api:command(Worker, Query), + Res + end) + end). + +%% TODO: move to ct helpers??? +reset_proxy(ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/reset", + Body = <<>>, + {ok, {{_, 204, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [], + [{body_format, binary}]). + +with_failure(FailureType, ProxyHost, ProxyPort, Fun) -> + enable_failure(FailureType, ProxyHost, ProxyPort), + try + Fun() + after + heal_failure(FailureType, ProxyHost, ProxyPort) + end. + +enable_failure(FailureType, ProxyHost, ProxyPort) -> + case FailureType of + down -> switch_proxy(off, ProxyHost, ProxyPort); + timeout -> timeout_proxy(on, ProxyHost, ProxyPort); + latency_up -> latency_up_proxy(on, ProxyHost, ProxyPort) + end. + +heal_failure(FailureType, ProxyHost, ProxyPort) -> + case FailureType of + down -> switch_proxy(on, ProxyHost, ProxyPort); + timeout -> timeout_proxy(off, ProxyHost, ProxyPort); + latency_up -> latency_up_proxy(off, ProxyHost, ProxyPort) + end. + +switch_proxy(Switch, ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo", + Body = case Switch of + off -> <<"{\"enabled\":false}">>; + on -> <<"{\"enabled\":true}">> + end, + {ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [], + [{body_format, binary}]). + +timeout_proxy(on, ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics", + Body = <<"{\"name\":\"timeout\",\"type\":\"timeout\"," + "\"stream\":\"upstream\",\"toxicity\":1.0," + "\"attributes\":{\"timeout\":0}}">>, + {ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [], + [{body_format, binary}]); +timeout_proxy(off, ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics/timeout", + Body = <<>>, + {ok, {{_, 204, _}, _, _}} = httpc:request(delete, {Url, [], "application/json", Body}, [], + [{body_format, binary}]). + +latency_up_proxy(on, ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics", + Body = <<"{\"name\":\"latency_up\",\"type\":\"latency\"," + "\"stream\":\"upstream\",\"toxicity\":1.0," + "\"attributes\":{\"latency\":20000,\"jitter\":3000}}">>, + {ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [], + [{body_format, binary}]); +latency_up_proxy(off, ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics/latency_up", + Body = <<>>, + {ok, {{_, 204, _}, _, _}} = httpc:request(delete, {Url, [], "application/json", Body}, [], + [{body_format, binary}]). diff --git a/rebar.config b/rebar.config index 19d199a6e..7cf994c36 100644 --- a/rebar.config +++ b/rebar.config @@ -59,7 +59,7 @@ , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {getopt, "1.0.1"} - , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.1"}}} , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}} , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}} , {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}