%%-------------------------------------------------------------------- %% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_resource_SUITE). -compile(nowarn_export_all). -compile(export_all). -include_lib("eunit/include/eunit.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(TEST_RESOURCE, emqx_connector_demo). -define(ID, <<"id">>). -define(ID1, <<"id1">>). -define(DEFAULT_RESOURCE_GROUP, <<"default">>). -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). -define(TELEMETRY_PREFIX, emqx, resource). -import(emqx_common_test_helpers, [on_exit/1]). all() -> emqx_common_test_helpers:all(?MODULE). groups() -> []. init_per_testcase(_, Config) -> ct:timetrap({seconds, 30}), emqx_connector_demo:set_callback_mode(always_sync), snabbkaffe:start_trace(), Config. end_per_testcase(_, _Config) -> snabbkaffe:stop(), _ = emqx_resource:remove_local(?ID), emqx_common_test_helpers:call_janitor(), ok. init_per_suite(Config) -> code:ensure_loaded(?TEST_RESOURCE), ok = emqx_common_test_helpers:start_apps([emqx_conf]), {ok, _} = application:ensure_all_started(emqx_resource), Config. end_per_suite(_Config) -> ok = emqx_common_test_helpers:stop_apps([emqx_resource, emqx_conf]). %%------------------------------------------------------------------------------ %% Tests %%------------------------------------------------------------------------------ t_list_types(_) -> ?assert(lists:member(?TEST_RESOURCE, emqx_resource:list_types())). t_check_config(_) -> {ok, #{}} = emqx_resource:check_config(?TEST_RESOURCE, bin_config()), {ok, #{}} = emqx_resource:check_config(?TEST_RESOURCE, config()), {error, _} = emqx_resource:check_config(?TEST_RESOURCE, <<"not a config">>), {error, _} = emqx_resource:check_config(?TEST_RESOURCE, #{invalid => config}). t_create_remove(_) -> ?check_trace( begin ?assertMatch( {error, _}, emqx_resource:check_and_create_local( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{unknown => test_resource} ) ), ?assertMatch( {ok, _}, create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource} ) ), ?assertMatch( {ok, _}, emqx_resource:recreate_local( ?ID, ?TEST_RESOURCE, #{name => test_resource}, #{} ) ), {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid)), ?assertEqual(ok, emqx_resource:remove_local(?ID)), ?assertMatch(ok, emqx_resource:remove_local(?ID)), ?assertNot(is_process_alive(Pid)) end, fun(Trace) -> ?assertEqual([], ?of_kind("inconsistent_status", Trace)), ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) end ). t_create_remove_local(_) -> ?check_trace( begin ?assertMatch( {error, _}, emqx_resource:check_and_create_local( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{unknown => test_resource} ) ), ?assertMatch( {ok, _}, create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource} ) ), emqx_resource:recreate_local( ?ID, ?TEST_RESOURCE, #{name => test_resource}, #{} ), {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid)), emqx_resource:set_resource_status_connecting(?ID), emqx_resource:recreate_local( ?ID, ?TEST_RESOURCE, #{name => test_resource}, #{} ), ?assertEqual(ok, emqx_resource:remove_local(?ID)), ?assertMatch(ok, emqx_resource:remove_local(?ID)), ?assertMatch( {error, not_found}, emqx_resource:query(?ID, get_state) ), ?assertNot(is_process_alive(Pid)) end, fun(Trace) -> ?assertEqual([], ?of_kind("inconsistent_status", Trace)), ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) end ). t_do_not_start_after_created(_) -> ?check_trace( begin ?assertMatch( {ok, _}, create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{start_after_created => false} ) ), %% the resource should remain `disconnected` after created timer:sleep(200), ?assertMatch( ?RESOURCE_ERROR(stopped), emqx_resource:query(?ID, get_state) ), ?assertMatch( {ok, _, #{status := stopped}}, emqx_resource:get_instance(?ID) ), %% start the resource manually.. ?assertEqual(ok, emqx_resource:start(?ID)), {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid)), %% restart the resource ?assertEqual(ok, emqx_resource:restart(?ID)), ?assertNot(is_process_alive(Pid)), {ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid2)), ?assertEqual(ok, emqx_resource:remove_local(?ID)), ?assertNot(is_process_alive(Pid2)) end, fun(Trace) -> ?assertEqual([], ?of_kind("inconsistent_status", Trace)), ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) end ). t_query(_) -> {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource} ), {ok, #{pid := _}} = emqx_resource:query(?ID, get_state), ?assertMatch( {error, not_found}, emqx_resource:query(<<"unknown">>, get_state) ), ok = emqx_resource:remove_local(?ID). t_query_counter(_) -> {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true} ), {ok, 0} = emqx_resource:query(?ID, get_counter), ok = emqx_resource:query(?ID, {inc_counter, 1}), {ok, 1} = emqx_resource:query(?ID, get_counter), ok = emqx_resource:query(?ID, {inc_counter, 5}), {ok, 6} = emqx_resource:query(?ID, get_counter), ok = emqx_resource:remove_local(?ID). t_batch_query_counter(_) -> BatchSize = 100, {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, #{ batch_size => BatchSize, batch_time => 100, query_mode => sync } ), ?check_trace( ?TRACE_OPTS, emqx_resource:query(?ID, get_counter), fun(Result, Trace) -> ?assertMatch({ok, 0}, Result), QueryTrace = ?of_kind(call_batch_query, Trace), ?assertMatch([#{batch := [{query, _, get_counter, _, _}]}], QueryTrace) end ), NMsgs = 1_000, ?check_trace( ?TRACE_OPTS, begin NEvents = round(math:ceil(NMsgs / BatchSize)), {ok, SRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter}), NEvents, _Timeout = 10_000 ), inc_counter_in_parallel(NMsgs), {ok, _} = snabbkaffe:receive_events(SRef), ok end, fun(Trace) -> QueryTrace = [ Event || Event = #{ ?snk_kind := call_batch_query, batch := BatchReq } <- Trace, length(BatchReq) > 1 ], ?assertMatch([_ | _], QueryTrace) end ), {ok, NMsgs} = emqx_resource:query(?ID, get_counter), ok = emqx_resource:remove_local(?ID). t_query_counter_async_query(_) -> {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, #{ query_mode => async, batch_size => 1, metrics_flush_interval => 50 } ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), NMsgs = 1_000, ?check_trace( ?TRACE_OPTS, begin {ok, SRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter}), NMsgs, _Timeout = 60_000 ), inc_counter_in_parallel(NMsgs), {ok, _} = snabbkaffe:receive_events(SRef), ok end, fun(Trace) -> %% the callback_mode of 'emqx_connector_demo' is 'always_sync'. QueryTrace = ?of_kind(call_query, Trace), ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) end ), %% simple query ignores the query_mode and batching settings in the resource_worker ?check_trace( ?TRACE_OPTS, emqx_resource:simple_sync_query(?ID, get_counter), fun(Result, Trace) -> ?assertMatch({ok, 1000}, Result), %% the callback_mode if 'emqx_connector_demo' is 'always_sync'. QueryTrace = ?of_kind(call_query, Trace), ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace) end ), #{counters := C} = emqx_resource:get_metrics(?ID), ?retry( _Sleep = 300, _Attempts0 = 20, ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C) ), ok = emqx_resource:remove_local(?ID). t_query_counter_async_callback(_) -> emqx_connector_demo:set_callback_mode(async_if_possible), Tab0 = ets:new(?FUNCTION_NAME, [bag, public]), Insert = fun(Tab, Result) -> ets:insert(Tab, {make_ref(), Result}) end, ReqOpts = #{async_reply_fun => {Insert, [Tab0]}}, {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, #{ query_mode => async, batch_size => 1, inflight_window => 1000000 } ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), NMsgs = 1_000, ?check_trace( ?TRACE_OPTS, begin {ok, SRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), NMsgs, _Timeout = 60_000 ), inc_counter_in_parallel(NMsgs, ReqOpts), {ok, _} = snabbkaffe:receive_events(SRef), ok end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) end ), %% simple query ignores the query_mode and batching settings in the resource_worker ?check_trace( ?TRACE_OPTS, emqx_resource:simple_sync_query(?ID, get_counter), fun(Result, Trace) -> ?assertMatch({ok, 1000}, Result), QueryTrace = ?of_kind(call_query, Trace), ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace) end ), #{counters := C} = emqx_resource:get_metrics(?ID), ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C), ?assertMatch(1000, ets:info(Tab0, size)), ?assert( lists:all( fun ({_, ok}) -> true; (_) -> false end, ets:tab2list(Tab0) ) ), ok = emqx_resource:remove_local(?ID). t_query_counter_async_inflight(_) -> emqx_connector_demo:set_callback_mode(async_if_possible), MetricsTab = ets:new(metrics_tab, [ordered_set, public]), ok = telemetry:attach_many( ?FUNCTION_NAME, emqx_resource_metrics:events(), fun(Event, Measurements, Meta, _Config) -> ets:insert( MetricsTab, {erlang:monotonic_time(), #{ event => Event, measurements => Measurements, metadata => Meta }} ), ok end, unused_config ), on_exit(fun() -> telemetry:detach(?FUNCTION_NAME) end), Tab0 = ets:new(?FUNCTION_NAME, [bag, public]), Insert0 = fun(Tab, Ref, Result) -> ct:pal("inserting ~p", [{Ref, Result}]), ets:insert(Tab, {Ref, Result}) end, ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end, WindowSize = 15, {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, #{ query_mode => async, batch_size => 1, inflight_window => WindowSize, worker_pool_size => 1, resume_interval => 300 } ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), %% block the resource ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), %% send async query to make the inflight window full ?check_trace( {_, {ok, _}} = ?wait_async_action( %% one more so that inflight would be already full upon last query inc_counter_in_parallel(WindowSize + 1, ReqOpts), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 1_000 ), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) end ), tap_metrics(?LINE), ?assertMatch(0, ets:info(Tab0, size)), tap_metrics(?LINE), %% send query now will fail because the resource is blocked. Insert = fun(Tab, Ref, Result) -> ct:pal("inserting ~p", [{Ref, Result}]), ets:insert(Tab, {Ref, Result}), ?tp(tmp_query_inserted, #{}) end, %% since this counts as a failure, it'll be enqueued and retried %% later, when the resource is unblocked. {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {inc_counter, 99}, #{ async_reply_fun => {Insert, [Tab0, tmp_query]} }), #{?snk_kind := buffer_worker_appended_to_queue}, 1_000 ), tap_metrics(?LINE), %% all responses should be received after the resource is resumed. {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), %% +2 because the tmp_query above will be retried and succeed %% this time. WindowSize + 2, _Timeout0 = 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), tap_metrics(?LINE), {ok, _} = snabbkaffe:receive_events(SRef0), tap_metrics(?LINE), %% since the previous tmp_query was enqueued to be retried, we %% take it again from the table; this time, it should have %% succeeded. ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)), %% send async query, this time everything should be ok. Num = 10, ?check_trace( begin {ok, SRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), Num, _Timeout0 = 10_000 ), inc_counter_in_parallel_increasing(Num, 1, ReqOpts), {ok, _} = snabbkaffe:receive_events(SRef), ok end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), ?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace), ?assertEqual(WindowSize + Num + 1, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), ok end ), %% block the resource ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), %% again, send async query to make the inflight window full ?check_trace( {_, {ok, _}} = ?wait_async_action( %% one more so that inflight would be already full upon last query inc_counter_in_parallel(WindowSize + 1, ReqOpts), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 1_000 ), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) end ), %% this will block the resource_worker ok = emqx_resource:query(?ID, {inc_counter, 4}), Sent = WindowSize + 1 + Num + WindowSize + 1, {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), WindowSize + 1, _Timeout0 = 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), {ok, _} = snabbkaffe:receive_events(SRef1), ?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]), ?assert(Sent =< Counter), %% give the metrics some time to stabilize. ct:sleep(1000), #{counters := C, gauges := G} = tap_metrics(?LINE), ?assertMatch( #{ counters := #{matched := M, success := Ss, dropped := Dp}, gauges := #{queuing := Qing, inflight := Infl} } when M == Ss + Dp + Qing + Infl, #{counters => C, gauges => G}, #{ metrics => #{counters => C, gauges => G}, results => ets:tab2list(Tab0), metrics_trace => ets:tab2list(MetricsTab) } ), ?assert( lists:all( fun ({_, ok}) -> true; (_) -> false end, ets:tab2list(Tab0) ) ), ok = emqx_resource:remove_local(?ID). t_query_counter_async_inflight_batch(_) -> emqx_connector_demo:set_callback_mode(async_if_possible), MetricsTab = ets:new(metrics_tab, [ordered_set, public]), ok = telemetry:attach_many( ?FUNCTION_NAME, emqx_resource_metrics:events(), fun(Event, Measurements, Meta, _Config) -> ets:insert( MetricsTab, {erlang:monotonic_time(), #{ event => Event, measurements => Measurements, metadata => Meta }} ), ok end, unused_config ), on_exit(fun() -> telemetry:detach(?FUNCTION_NAME) end), Tab0 = ets:new(?FUNCTION_NAME, [bag, public]), Insert0 = fun(Tab, Ref, Result) -> ct:pal("inserting ~p", [{Ref, Result}]), ets:insert(Tab, {Ref, Result}) end, ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end, BatchSize = 2, WindowSize = 15, {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, #{ query_mode => async, batch_size => BatchSize, batch_time => 100, inflight_window => WindowSize, worker_pool_size => 1, resume_interval => 300 } ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), %% block the resource ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), %% send async query to make the inflight window full NumMsgs = BatchSize * WindowSize, ?check_trace( {_, {ok, _}} = ?wait_async_action( %% a batch more so that inflight would be already full upon last query inc_counter_in_parallel(NumMsgs + BatchSize, ReqOpts), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), fun(Trace) -> QueryTrace = [ Event || Event = #{ ?snk_kind := call_batch_query_async, batch := [ {query, _, {inc_counter, 1}, _, _}, {query, _, {inc_counter, 1}, _, _} ] } <- Trace ], ?assertMatch([_ | _], QueryTrace) end ), tap_metrics(?LINE), Sent1 = NumMsgs + BatchSize, ?check_trace( begin %% this will block the resource_worker as the inflight window is full now {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {inc_counter, 2}, ReqOpts()), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), ?assertMatch(0, ets:info(Tab0, size)), ok end, [] ), Sent2 = Sent1 + 1, tap_metrics(?LINE), %% send query now will fail because the resource is blocked. Insert = fun(Tab, Ref, Result) -> ct:pal("inserting ~p", [{Ref, Result}]), ets:insert(Tab, {Ref, Result}), ?tp(tmp_query_inserted, #{}) end, %% since this counts as a failure, it'll be enqueued and retried %% later, when the resource is unblocked. {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {inc_counter, 3}, #{ async_reply_fun => {Insert, [Tab0, tmp_query]} }), #{?snk_kind := buffer_worker_appended_to_queue}, 1_000 ), tap_metrics(?LINE), %% all responses should be received after the resource is resumed. {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), %% +2 because the tmp_query above will be retried and succeed %% this time. WindowSize + 2, 5_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), tap_metrics(?LINE), {ok, _} = snabbkaffe:receive_events(SRef0), %% since the previous tmp_query was enqueued to be retried, we %% take it again from the table; this time, it should have %% succeeded. ?assertEqual([{tmp_query, ok}], ets:take(Tab0, tmp_query)), ?assertEqual(Sent2, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), %% send async query, this time everything should be ok. NumBatches1 = 3, NumMsgs1 = BatchSize * NumBatches1, ?check_trace( ?TRACE_OPTS, begin {ok, SRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), NumBatches1, 5_000 ), inc_counter_in_parallel(NumMsgs1, ReqOpts), {ok, _} = snabbkaffe:receive_events(SRef), ok end, fun(Trace) -> QueryTrace = ?of_kind(call_batch_query_async, Trace), ?assertMatch( [#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _], QueryTrace ) end ), Sent3 = Sent2 + NumMsgs1, ?assertEqual(Sent3, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), %% block the resource ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), %% again, send async query to make the inflight window full ?check_trace( {_, {ok, _}} = ?wait_async_action( %% a batch more so that inflight would be already full upon last query inc_counter_in_parallel(NumMsgs + BatchSize, ReqOpts), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), fun(Trace) -> QueryTrace = ?of_kind(call_batch_query_async, Trace), ?assertMatch( [#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _], QueryTrace ) end ), Sent4 = Sent3 + NumMsgs + BatchSize, %% this will block the resource_worker ok = emqx_resource:query(?ID, {inc_counter, 1}), {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), WindowSize + 1, 5_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), {ok, _} = snabbkaffe:receive_events(SRef1), ?assertEqual(Sent4, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent4]), ?assert(Sent4 =< Counter), %% give the metrics some time to stabilize. ct:sleep(1000), #{counters := C, gauges := G} = tap_metrics(?LINE), ?assertMatch( #{ counters := #{matched := M, success := Ss, dropped := Dp}, gauges := #{queuing := Qing, inflight := Infl} } when M == Ss + Dp + Qing + Infl, #{counters => C, gauges => G}, #{ metrics => #{counters => C, gauges => G}, results => ets:tab2list(Tab0), metrics_trace => ets:tab2list(MetricsTab) } ), ?assert( lists:all( fun ({_, ok}) -> true; (_) -> false end, ets:tab2list(Tab0) ) ), ok = emqx_resource:remove_local(?ID). t_healthy_timeout(_) -> ?check_trace( begin ?assertMatch( {ok, _}, create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => <<"bad_not_atom_name">>, register => true}, %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later. #{health_check_interval => 200} ) ), ?assertMatch( {error, {resource_error, #{reason := timeout}}}, emqx_resource:query(?ID, get_state, #{timeout => 1_000}) ), ?assertMatch( {ok, _Group, #{status := disconnected}}, emqx_resource_manager:lookup(?ID) ), ?assertEqual(ok, emqx_resource:remove_local(?ID)) end, fun(Trace) -> ?assertEqual([], ?of_kind("inconsistent_status", Trace)), ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) end ). t_healthy(_) -> ?check_trace( begin ?assertMatch( {ok, _}, create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource} ) ), {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), timer:sleep(300), emqx_resource:set_resource_status_connecting(?ID), ?assertEqual({ok, connected}, emqx_resource:health_check(?ID)), ?assertMatch( [#{status := connected}], emqx_resource:list_instances_verbose() ), erlang:exit(Pid, shutdown), ?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)), ?assertMatch( [#{status := disconnected}], emqx_resource:list_instances_verbose() ), ?assertEqual(ok, emqx_resource:remove_local(?ID)) end, fun(Trace) -> ?assertEqual([], ?of_kind("inconsistent_status", Trace)), ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) end ). t_unhealthy_target(_) -> HealthCheckError = {unhealthy_target, "some message"}, ?assertMatch( {ok, _}, create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, health_check_error => {msg, HealthCheckError}} ) ), ?assertEqual( {ok, disconnected}, emqx_resource:health_check(?ID) ), ?assertMatch( {ok, _Group, #{error := HealthCheckError}}, emqx_resource_manager:lookup(?ID) ), %% messages are dropped when bridge is unhealthy lists:foreach( fun(_) -> ?assertMatch( {error, {resource_error, #{reason := unhealthy_target}}}, emqx_resource:query(?ID, message) ) end, lists:seq(1, 3) ), ?assertEqual(3, emqx_resource_metrics:matched_get(?ID)), ?assertEqual(3, emqx_resource_metrics:dropped_resource_stopped_get(?ID)). t_stop_start(_) -> ?check_trace( begin ?assertMatch( {error, _}, emqx_resource:check_and_create_local( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{unknown => test_resource} ) ), ?assertMatch( {ok, _}, emqx_resource:check_and_create_local( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{<<"name">> => <<"test_resource">>} ) ), %% add some metrics to test their persistence WorkerID0 = <<"worker:0">>, WorkerID1 = <<"worker:1">>, emqx_resource_metrics:inflight_set(?ID, WorkerID0, 2), emqx_resource_metrics:inflight_set(?ID, WorkerID1, 3), ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), ?assertMatch( {ok, _}, emqx_resource:check_and_recreate_local( ?ID, ?TEST_RESOURCE, #{<<"name">> => <<"test_resource">>}, #{} ) ), {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid0)), %% metrics are reset when recreating %% depending on timing, might show the request we just did. ct:sleep(500), ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), ok = emqx_resource:stop(?ID), ?assertNot(is_process_alive(Pid0)), ?assertMatch( ?RESOURCE_ERROR(stopped), emqx_resource:query(?ID, get_state) ), ?assertEqual(ok, emqx_resource:restart(?ID)), timer:sleep(300), {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid1)), %% now stop while resetting the metrics ct:sleep(500), emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1), emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4), ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), ?assertEqual(ok, emqx_resource:stop(?ID)), ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)) end, fun(Trace) -> ?assertEqual([], ?of_kind("inconsistent_status", Trace)), ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) end ). t_stop_start_local(_) -> ?check_trace( begin ?assertMatch( {error, _}, emqx_resource:check_and_create_local( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{unknown => test_resource} ) ), ?assertMatch( {ok, _}, emqx_resource:check_and_create_local( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{<<"name">> => <<"test_resource">>} ) ), ?assertMatch( {ok, _}, emqx_resource:check_and_recreate_local( ?ID, ?TEST_RESOURCE, #{<<"name">> => <<"test_resource">>}, #{} ) ), {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid0)), ?assertEqual(ok, emqx_resource:stop(?ID)), ?assertNot(is_process_alive(Pid0)), ?assertMatch( ?RESOURCE_ERROR(stopped), emqx_resource:query(?ID, get_state) ), ?assertEqual(ok, emqx_resource:restart(?ID)), {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid1)) end, fun(Trace) -> ?assertEqual([], ?of_kind("inconsistent_status", Trace)), ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) end ). t_list_filter(_) -> {ok, _} = create( emqx_resource:generate_id(<<"a">>), <<"group1">>, ?TEST_RESOURCE, #{name => a} ), {ok, _} = create( emqx_resource:generate_id(<<"a">>), <<"group2">>, ?TEST_RESOURCE, #{name => grouped_a} ), [Id1] = emqx_resource:list_group_instances(<<"group1">>), ?assertMatch( {ok, <<"group1">>, #{config := #{name := a}}}, emqx_resource:get_instance(Id1) ), [Id2] = emqx_resource:list_group_instances(<<"group2">>), ?assertMatch( {ok, <<"group2">>, #{config := #{name := grouped_a}}}, emqx_resource:get_instance(Id2) ). t_create_dry_run_local(_) -> lists:foreach( fun(_) -> create_dry_run_local_succ() end, lists:seq(1, 10) ), ?retry( 100, 5, ?assertEqual( [], emqx_resource:list_instances_verbose() ) ). create_dry_run_local_succ() -> ?assertEqual( ok, emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{name => test_resource, register => true} ) ), ?assertEqual(undefined, whereis(test_resource)). t_create_dry_run_local_failed(_) -> ct:timetrap({seconds, 120}), ct:pal("creating with creation error"), Res1 = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{create_error => true} ), ?assertMatch({error, _}, Res1), ct:pal("creating with health check error"), Res2 = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{name => test_resource, health_check_error => true} ), ?assertMatch({error, _}, Res2), ct:pal("creating with stop error"), Res3 = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{name => test_resource, stop_error => true} ), ?assertEqual(ok, Res3), ?retry( 100, 5, ?assertEqual( [], emqx_resource:list_instances_verbose() ) ). t_test_func(_) -> IsErrorMsgPlainString = fun({error, Msg}) -> io_lib:printable_list(Msg) end, ?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])), ?assertEqual(ok, erlang:apply(emqx_resource_validator:min(int, 3), [4])), ?assertEqual(ok, erlang:apply(emqx_resource_validator:max(array, 10), [[a, b, c, d]])), ?assertEqual(ok, erlang:apply(emqx_resource_validator:max(string, 10), ["less10"])), ?assertEqual( true, IsErrorMsgPlainString(erlang:apply(emqx_resource_validator:min(int, 66), [42])) ), ?assertEqual( true, IsErrorMsgPlainString(erlang:apply(emqx_resource_validator:max(int, 42), [66])) ), ?assertEqual( true, IsErrorMsgPlainString(erlang:apply(emqx_resource_validator:min(array, 3), [[1, 2]])) ), ?assertEqual( true, IsErrorMsgPlainString(erlang:apply(emqx_resource_validator:max(array, 3), [[1, 2, 3, 4]])) ), ?assertEqual( true, IsErrorMsgPlainString(erlang:apply(emqx_resource_validator:min(string, 3), ["1"])) ), ?assertEqual( true, IsErrorMsgPlainString(erlang:apply(emqx_resource_validator:max(string, 3), ["1234"])) ), NestedMsg = io_lib:format("The answer: ~p", [42]), ExpectedMsg = "The answer: 42", BinMsg = <<"The answer: 42">>, MapMsg = #{question => "The question", answer => 42}, ?assertEqual( {error, ExpectedMsg}, erlang:apply(emqx_resource_validator:not_empty(NestedMsg), [""]) ), ?assertEqual( {error, ExpectedMsg}, erlang:apply(emqx_resource_validator:not_empty(NestedMsg), [<<>>]) ), ?assertEqual( {error, ExpectedMsg}, erlang:apply(emqx_resource_validator:not_empty(NestedMsg), [undefined]) ), ?assertEqual( {error, ExpectedMsg}, erlang:apply(emqx_resource_validator:not_empty(NestedMsg), [undefined]) ), ?assertEqual( {error, BinMsg}, erlang:apply(emqx_resource_validator:not_empty(BinMsg), [undefined]) ), ?assertEqual( {error, MapMsg}, erlang:apply(emqx_resource_validator:not_empty(MapMsg), [""]) ). t_reset_metrics(_) -> {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource} ), {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), emqx_resource:reset_metrics(?ID), ?assert(is_process_alive(Pid)), ok = emqx_resource:remove_local(?ID), ?assertNot(is_process_alive(Pid)). t_auto_retry(_) -> {Res, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, create_error => true}, #{health_check_interval => 100} ), ?assertEqual(ok, Res). %% tests resources that have an asynchronous start: they are created %% without problems, but later some issue is found when calling the %% health check. t_start_throw_error(_Config) -> Message = "something went wrong", ?assertMatch( {{ok, _}, {ok, _}}, ?wait_async_action( create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, health_check_error => {msg, Message}}, #{health_check_interval => 100} ), #{?snk_kind := connector_demo_health_check_error}, 1_000 ) ), %% Now, if we try to "reconnect" (restart) it, we should get the error ?assertMatch({error, Message}, emqx_resource:start(?ID, _Opts = #{})), ok. t_health_check_disconnected(_) -> ?check_trace( begin _ = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, create_error => true}, #{health_check_interval => 100} ), ?assertEqual( {ok, disconnected}, emqx_resource:health_check(?ID) ) end, fun(Trace) -> ?assertEqual([], ?of_kind("inconsistent_status", Trace)), ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) end ). t_unblock_only_required_buffer_workers(_) -> {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 5, metrics_flush_interval => 50, batch_time => 100 } ), lists:foreach( fun emqx_resource_buffer_worker:block/1, emqx_resource_buffer_worker_sup:worker_pids(?ID) ), create( ?ID1, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 5, batch_time => 100 } ), %% creation of `?ID1` should not have unblocked `?ID`'s buffer workers %% so we should see resumes now (`buffer_worker_enter_running`). ?check_trace( ?wait_async_action( lists:foreach( fun emqx_resource_buffer_worker:resume/1, emqx_resource_buffer_worker_sup:worker_pids(?ID) ), #{?snk_kind := buffer_worker_enter_running}, 5000 ), fun(Trace) -> ?assertMatch( [#{id := ?ID} | _], ?of_kind(buffer_worker_enter_running, Trace) ) end ). t_retry_batch(_Config) -> {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 5, batch_time => 100, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => 1_000 } ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), Matched0 = emqx_resource_metrics:matched_get(?ID), ?assertEqual(1, Matched0), %% these requests will batch together and fail; the buffer worker %% will enter the `blocked' state and they'll be retried later, %% after it unblocks. Payloads = lists:seq(1, 5), NumPayloads = length(Payloads), ExpectedCount = 15, ?check_trace( begin {ok, {ok, _}} = ?wait_async_action( lists:foreach( fun(N) -> ok = emqx_resource:query(?ID, {inc_counter, N}) end, Payloads ), #{?snk_kind := buffer_worker_enter_blocked}, 5_000 ), %% now the individual messages should have been counted Matched1 = emqx_resource_metrics:matched_get(?ID), ?assertEqual(Matched0 + NumPayloads, Matched1), %% wait for two more retries while the failure is enabled; the %% batch shall remain enqueued. {ok, _} = snabbkaffe:block_until( ?match_n_events(2, #{?snk_kind := buffer_worker_retry_inflight_failed}), 5_000 ), %% should not have increased the matched count with the retries Matched2 = emqx_resource_metrics:matched_get(?ID), ?assertEqual(Matched1, Matched2), %% now unblock the buffer worker so it may retry the batch, %% but it'll still fail {ok, {ok, _}} = ?wait_async_action( ok = emqx_resource:simple_sync_query(?ID, resume), #{?snk_kind := buffer_worker_retry_inflight_succeeded}, 5_000 ), %% 1 more because of the `resume' call Matched3 = emqx_resource_metrics:matched_get(?ID), ?assertEqual(Matched2 + 1, Matched3), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), {Counter, Matched3} end, fun({Counter, Matched3}, Trace) -> %% 1 original attempt + 2 failed retries + final %% successful attempt. %% each time should be the original batch (no duplicate %% elements or reordering). ExpectedSeenPayloads = lists:flatten(lists:duplicate(4, Payloads)), Trace1 = lists:sublist( ?projection(n, ?of_kind(connector_demo_batch_inc_individual, Trace)), length(ExpectedSeenPayloads) ), ?assertEqual(ExpectedSeenPayloads, Trace1), ?assertMatch( [#{n := ExpectedCount}], ?of_kind(connector_demo_inc_counter, Trace) ), ?assertEqual(ExpectedCount, Counter), %% matched should count only the original requests, and not retries %% + 1 for `resume' call %% + 1 for `block' call %% + 1 for `get_counter' call %% and the message count (1 time) Matched4 = emqx_resource_metrics:matched_get(?ID), ?assertEqual(Matched3 + 1, Matched4), ok end ), ok. t_delete_and_re_create_with_same_name(_Config) -> NumBufferWorkers = 2, {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => sync, batch_size => 1, worker_pool_size => NumBufferWorkers, buffer_mode => volatile_offload, buffer_seg_bytes => 100, metrics_flush_interval => 50, resume_interval => 1_000 } ), %% pre-condition: we should have just created a new queue Queuing0 = emqx_resource_metrics:queuing_get(?ID), Inflight0 = emqx_resource_metrics:inflight_get(?ID), ?assertEqual(0, Queuing0), ?assertEqual(0, Inflight0), ?check_trace( begin ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), NumRequests = 10, {ok, SRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := buffer_worker_enter_blocked}), NumBufferWorkers, _Timeout = 5_000 ), %% ensure replayq offloads to disk Payload = binary:copy(<<"a">>, 119), lists:foreach( fun(N) -> spawn_link(fun() -> {error, _} = emqx_resource:query( ?ID, {big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>} ) end) end, lists:seq(1, NumRequests) ), {ok, _} = snabbkaffe:receive_events(SRef), %% ensure that stuff got enqueued into disk tap_metrics(?LINE), ?retry( _Sleep = 300, _Attempts0 = 20, ?assert(emqx_resource_metrics:queuing_get(?ID) > 0) ), ?retry( _Sleep = 300, _Attempts0 = 20, ?assertEqual(2, emqx_resource_metrics:inflight_get(?ID)) ), %% now, we delete the resource process_flag(trap_exit, true), ok = emqx_resource:remove_local(?ID), ?assertEqual({error, not_found}, emqx_resource_manager:lookup(?ID)), %% re-create the resource with the *same name* {{ok, _}, {ok, _Events}} = ?wait_async_action( create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 1, worker_pool_size => 2, buffer_seg_bytes => 100, resume_interval => 1_000 } ), #{?snk_kind := buffer_worker_enter_running}, 5_000 ), %% it shouldn't have anything enqueued, as it's a fresh resource Queuing2 = emqx_resource_metrics:queuing_get(?ID), Inflight2 = emqx_resource_metrics:queuing_get(?ID), ?assertEqual(0, Queuing2), ?assertEqual(0, Inflight2), ok end, [] ), ok. %% check that, if we configure a max queue size too small, then we %% never send requests and always overflow. t_always_overflow(_Config) -> {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => sync, batch_size => 1, worker_pool_size => 1, max_buffer_bytes => 1, metrics_flush_interval => 50, resume_interval => 1_000 } ), ?check_trace( begin Payload = binary:copy(<<"a">>, 100), %% since it's sync and it should never send a request, this %% errors with `timeout'. ?assertEqual( {error, buffer_overflow}, emqx_resource:query( ?ID, {big_payload, Payload}, #{timeout => 500} ) ), ok end, fun(Trace) -> ?assertEqual([], ?of_kind(call_query_enter, Trace)), ok end ), ok. t_retry_sync_inflight(_Config) -> ResumeInterval = 1_000, emqx_connector_demo:set_callback_mode(always_sync), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => sync, batch_size => 1, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => ResumeInterval } ), QueryOpts = #{}, ?check_trace( begin %% now really make the resource go into `blocked' state. %% this results in a retriable error when sync. ok = emqx_resource:simple_sync_query(?ID, block), TestPid = self(), {_, {ok, _}} = ?wait_async_action( spawn_link(fun() -> Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), TestPid ! {res, Res} end), #{?snk_kind := buffer_worker_retry_inflight_failed}, ResumeInterval * 2 ), {ok, {ok, _}} = ?wait_async_action( ok = emqx_resource:simple_sync_query(?ID, resume), #{?snk_kind := buffer_worker_retry_inflight_succeeded}, ResumeInterval * 3 ), receive {res, Res} -> ?assertEqual(ok, Res) after 5_000 -> ct:fail("no response") end, ok end, [fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1] ), ok. t_retry_sync_inflight_batch(_Config) -> ResumeInterval = 1_000, emqx_connector_demo:set_callback_mode(always_sync), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => sync, batch_size => 2, batch_time => 200, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => ResumeInterval } ), QueryOpts = #{}, ?check_trace( begin %% make the resource go into `blocked' state. this %% results in a retriable error when sync. ok = emqx_resource:simple_sync_query(?ID, block), process_flag(trap_exit, true), TestPid = self(), {_, {ok, _}} = ?wait_async_action( spawn_link(fun() -> Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), TestPid ! {res, Res} end), #{?snk_kind := buffer_worker_retry_inflight_failed}, ResumeInterval * 2 ), {ok, {ok, _}} = ?wait_async_action( ok = emqx_resource:simple_sync_query(?ID, resume), #{?snk_kind := buffer_worker_retry_inflight_succeeded}, ResumeInterval * 3 ), receive {res, Res} -> ?assertEqual(ok, Res) after 5_000 -> ct:fail("no response") end, ok end, [fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1] ), ok. t_retry_async_inflight(_Config) -> ResumeInterval = 1_000, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 1, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => ResumeInterval } ), QueryOpts = #{}, ?check_trace( begin %% block ok = emqx_resource:simple_sync_query(?ID, block), %% then send an async request; that should be retriable. {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), #{?snk_kind := buffer_worker_retry_inflight_failed}, ResumeInterval * 2 ), %% will reply with success after the resource is healed {ok, {ok, _}} = ?wait_async_action( emqx_resource:simple_sync_query(?ID, resume), #{?snk_kind := buffer_worker_enter_running}, ResumeInterval * 2 ), ok end, [fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1] ), ok. t_retry_async_inflight_full(_Config) -> ResumeInterval = 1_000, AsyncInflightWindow = 5, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => ?FUNCTION_NAME}, #{ query_mode => async, inflight_window => AsyncInflightWindow, batch_size => 1, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => ResumeInterval } ), ?check_trace( #{timetrap => 15_000}, begin %% block ok = emqx_resource:simple_sync_query(?ID, block), {ok, {ok, _}} = ?wait_async_action( inc_counter_in_parallel( AsyncInflightWindow * 2, fun() -> For = (ResumeInterval div 4) + rand:uniform(ResumeInterval div 4), {sleep_before_reply, For} end, #{async_reply_fun => {fun(Res) -> ct:pal("Res = ~p", [Res]) end, []}} ), #{?snk_kind := buffer_worker_flush_but_inflight_full}, ResumeInterval * 2 ), %% will reply with success after the resource is healed {ok, {ok, _}} = ?wait_async_action( emqx_resource:simple_sync_query(?ID, resume), #{?snk_kind := buffer_worker_enter_running} ), ok end, [ fun(Trace) -> ?assertMatch([#{} | _], ?of_kind(buffer_worker_flush_but_inflight_full, Trace)) end ] ), ?retry( _Sleep = 300, _Attempts0 = 20, ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)) ), ok. %% this test case is to ensure the buffer worker will not go crazy even %% if the underlying connector is misbehaving: evaluate async callbacks multiple times t_async_reply_multi_eval(_Config) -> ResumeInterval = 5, TotalTime = 5_000, AsyncInflightWindow = 3, TotalQueries = AsyncInflightWindow * 5, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => ?FUNCTION_NAME}, #{ query_mode => async, inflight_window => AsyncInflightWindow, batch_size => 3, batch_time => 10, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => ResumeInterval } ), %% block ok = emqx_resource:simple_sync_query(?ID, block), inc_counter_in_parallel( TotalQueries, fun() -> Rand = rand:uniform(1000), {random_reply, Rand} end, #{} ), ?retry( 2 * ResumeInterval, TotalTime div ResumeInterval, begin Metrics = tap_metrics(?LINE), #{ counters := Counters, gauges := #{queuing := 0, inflight := 0} } = Metrics, #{ matched := Matched, success := Success, dropped := Dropped, late_reply := LateReply, failed := Failed } = Counters, ?assertEqual(TotalQueries, Matched - 1), ?assertEqual(Matched, Success + Dropped + LateReply + Failed, #{counters => Counters}) end ). t_retry_async_inflight_batch(_Config) -> ResumeInterval = 1_000, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 2, batch_time => 200, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => ResumeInterval } ), QueryOpts = #{}, ?check_trace( begin %% block ok = emqx_resource:simple_sync_query(?ID, block), %% then send an async request; that should be retriable. {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), #{?snk_kind := buffer_worker_retry_inflight_failed}, ResumeInterval * 2 ), %% will reply with success after the resource is healed {ok, {ok, _}} = ?wait_async_action( emqx_resource:simple_sync_query(?ID, resume), #{?snk_kind := buffer_worker_enter_running}, ResumeInterval * 2 ), ok end, [fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1] ), ok. %% check that we monitor async worker pids and abort their inflight %% requests if they die. t_async_pool_worker_death(_Config) -> ResumeInterval = 1_000, NumBufferWorkers = 2, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 1, worker_pool_size => NumBufferWorkers, metrics_refresh_interval => 50, resume_interval => ResumeInterval } ), Tab0 = ets:new(?FUNCTION_NAME, [bag, public]), Insert0 = fun(Tab, Ref, Result) -> ct:pal("inserting ~p", [{Ref, Result}]), ets:insert(Tab, {Ref, Result}) end, ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end, ?check_trace( begin ok = emqx_resource:simple_sync_query(?ID, block), NumReqs = 10, {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := buffer_worker_appended_to_inflight}), NumReqs, 1_000 ), inc_counter_in_parallel_increasing(NumReqs, 1, ReqOpts), {ok, _} = snabbkaffe:receive_events(SRef0), ?retry( _Sleep = 300, _Attempts0 = 20, ?assertEqual(NumReqs, emqx_resource_metrics:inflight_get(?ID)) ), %% grab one of the worker pids and kill it {ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state), MRef = monitor(process, Pid0), ct:pal("will kill ~p", [Pid0]), exit(Pid0, kill), receive {'DOWN', MRef, process, Pid0, killed} -> ct:pal("~p killed", [Pid0]), ok after 200 -> ct:fail("worker should have died") end, %% inflight requests should have been marked as retriable wait_until_all_marked_as_retriable(NumReqs), Inflight1 = emqx_resource_metrics:inflight_get(?ID), ?assertEqual(NumReqs, Inflight1), NumReqs end, fun(NumReqs, Trace) -> Events = ?of_kind(buffer_worker_async_agent_down, Trace), %% At least one buffer worker should have marked its %% requests as retriable. If a single one has %% received all requests, that's all we got. ?assertMatch([_ | _], Events), %% All requests distributed over all buffer workers %% should have been marked as retriable, by the time %% the inflight has been drained. ?assertEqual( NumReqs, lists:sum([N || #{num_affected := N} <- Events]) ), %% The `DOWN' signal must trigger the transition to the `blocked' state, %% otherwise the request won't be retried until the buffer worker is `blocked' %% for other reasons. ?assert( ?strict_causality( #{?snk_kind := buffer_worker_async_agent_down, buffer_worker := _Pid0}, #{?snk_kind := buffer_worker_enter_blocked, buffer_worker := _Pid1}, _Pid0 =:= _Pid1, Trace ) ), ok end ), ok. t_expiration_sync_before_sending(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => sync, batch_size => 1, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => 1_000 } ), do_t_expiration_before_sending(sync). t_expiration_sync_batch_before_sending(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => sync, batch_size => 2, batch_time => 100, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => 1_000 } ), do_t_expiration_before_sending(sync). t_expiration_async_before_sending(_Config) -> emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 1, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => 1_000 } ), do_t_expiration_before_sending(async). t_expiration_async_batch_before_sending(_Config) -> emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 2, batch_time => 100, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => 1_000 } ), do_t_expiration_before_sending(async). do_t_expiration_before_sending(QueryMode) -> ?check_trace( begin ok = emqx_resource:simple_sync_query(?ID, block), ?force_ordering( #{?snk_kind := buffer_worker_flush_before_pop}, #{?snk_kind := delay_enter} ), ?force_ordering( #{?snk_kind := delay}, #{?snk_kind := buffer_worker_flush_before_sieve_expired} ), TimeoutMS = 100, spawn_link(fun() -> case QueryMode of sync -> ?assertMatch( {error, {resource_error, #{reason := timeout}}}, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => TimeoutMS}) ); async -> ?assertEqual( ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => TimeoutMS}) ) end end), spawn_link(fun() -> ?tp(delay_enter, #{}), ct:sleep(2 * TimeoutMS), ?tp(delay, #{}), ok end), {ok, _} = ?block_until(#{?snk_kind := buffer_worker_flush_all_expired}, 4 * TimeoutMS), ok end, fun(Trace) -> ?assertMatch( [#{batch := [{query, _, {inc_counter, 99}, _, _}]}], ?of_kind(buffer_worker_flush_all_expired, Trace) ), Metrics = tap_metrics(?LINE), ?assertMatch( #{ counters := #{ matched := 2, %% the block call success := 1, dropped := 1, 'dropped.expired' := 1, retried := 0, failed := 0 } }, Metrics ), ok end ), ok. t_expiration_sync_before_sending_partial_batch(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => sync, batch_size => 2, batch_time => 100, worker_pool_size => 1, metrics_flush_interval => 250, resume_interval => 1_000 } ), install_telemetry_handler(?FUNCTION_NAME), do_t_expiration_before_sending_partial_batch(sync). t_expiration_async_before_sending_partial_batch(_Config) -> emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 2, batch_time => 100, worker_pool_size => 1, metrics_flush_interval => 250, resume_interval => 1_000 } ), install_telemetry_handler(?FUNCTION_NAME), do_t_expiration_before_sending_partial_batch(async). do_t_expiration_before_sending_partial_batch(QueryMode) -> ?check_trace( begin ok = emqx_resource:simple_sync_query(?ID, block), ?force_ordering( #{?snk_kind := buffer_worker_flush_before_pop}, #{?snk_kind := delay_enter} ), ?force_ordering( #{?snk_kind := delay}, #{?snk_kind := buffer_worker_flush_before_sieve_expired} ), Pid0 = spawn_link(fun() -> ?assertEqual( ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity}) ), ?tp(infinity_query_returned, #{}) end), TimeoutMS = 100, Pid1 = spawn_link(fun() -> case QueryMode of sync -> ?assertMatch( {error, {resource_error, #{reason := timeout}}}, emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) ); async -> ?assertEqual( ok, emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) ) end end), Pid2 = spawn_link(fun() -> ?tp(delay_enter, #{}), ct:sleep(2 * TimeoutMS), ?tp(delay, #{}), ok end), {ok, _} = ?block_until( #{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS ), ok = emqx_resource:simple_sync_query(?ID, resume), case QueryMode of async -> {ok, _} = ?block_until( #{ ?snk_kind := handle_async_reply, action := ack, batch_or_query := [{query, _, {inc_counter, 99}, _, _}] }, 10 * TimeoutMS ); sync -> %% more time because it needs to retry if sync {ok, _} = ?block_until(#{?snk_kind := infinity_query_returned}, 20 * TimeoutMS) end, lists:foreach( fun(Pid) -> unlink(Pid), exit(Pid, kill) end, [Pid0, Pid1, Pid2] ), ok end, fun(Trace) -> ?assertMatch( [ #{ expired := [{query, _, {inc_counter, 199}, _, _}], not_expired := [{query, _, {inc_counter, 99}, _, _}] } ], ?of_kind(buffer_worker_flush_potentially_partial, Trace) ), wait_until_gauge_is( inflight, #{ expected_value => 0, timeout => 500, max_events => 10 } ), Metrics = tap_metrics(?LINE), case QueryMode of async -> ?assertMatch( #{ counters := #{ matched := 4, %% the block call, the request with %% infinity timeout, and the resume %% call. success := 3, dropped := 1, 'dropped.expired' := 1, %% was sent successfully and held by %% the test connector. retried := 0, failed := 0 } }, Metrics ); sync -> ?assertMatch( #{ counters := #{ matched := 4, %% the block call, the request with %% infinity timeout, and the resume %% call. success := 3, dropped := 1, 'dropped.expired' := 1, %% currently, the test connector %% replies with an error that may make %% the buffer worker retry. retried := Retried, failed := 0 } } when Retried =< 1, Metrics ) end, ok end ), ok. t_expiration_async_after_reply(_Config) -> emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 1, worker_pool_size => 1, resume_interval => 1_000 } ), install_telemetry_handler(?FUNCTION_NAME), do_t_expiration_async_after_reply(single). t_expiration_async_batch_after_reply(_Config) -> emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 3, batch_time => 100, worker_pool_size => 1, resume_interval => 2_000 } ), install_telemetry_handler(?FUNCTION_NAME), do_t_expiration_async_after_reply(batch). do_t_expiration_async_after_reply(IsBatch) -> ?check_trace( begin NAcks = case IsBatch of batch -> 1; single -> 3 end, ?force_ordering( #{?snk_kind := buffer_worker_flush_ack}, NAcks, #{?snk_kind := delay_enter}, _Guard = true ), ?force_ordering( #{?snk_kind := delay}, #{ ?snk_kind := handle_async_reply_enter, batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] } ), TimeoutMS = 100, ?assertEqual( ok, emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) ), ?assertEqual( ok, emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS}) ), ?assertEqual( ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity}) ), Pid0 = spawn_link(fun() -> ?tp(delay_enter, #{}), ct:sleep(2 * TimeoutMS), ?tp(delay, #{}), ok end), {ok, _} = ?block_until( #{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS ), {ok, _} = ?block_until( #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS ), wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}), unlink(Pid0), exit(Pid0, kill), ok end, fun(Trace) -> case IsBatch of batch -> ?assertMatch( [ #{ expired := [ {query, _, {inc_counter, 199}, _, _}, {query, _, {inc_counter, 299}, _, _} ] } ], ?of_kind(handle_async_reply_expired, Trace) ), ?assertMatch( [ #{ inflight_count := 1, num_inflight_messages := 1 } ], ?of_kind(handle_async_reply_partially_expired, Trace) ); single -> ?assertMatch( [ #{expired := [{query, _, {inc_counter, 199}, _, _}]}, #{expired := [{query, _, {inc_counter, 299}, _, _}]} ], ?of_kind(handle_async_reply_expired, Trace) ) end, Metrics = tap_metrics(?LINE), ?assertMatch( #{ counters := #{ matched := 3, %% the request with infinity timeout. success := 1, dropped := 0, late_reply := 2, retried := 0, failed := 0 } }, Metrics ), ok end ), ok. t_expiration_batch_all_expired_after_reply(_Config) -> ResumeInterval = 300, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 3, batch_time => 100, worker_pool_size => 1, resume_interval => ResumeInterval } ), ?check_trace( begin ?force_ordering( #{?snk_kind := buffer_worker_flush_ack}, #{?snk_kind := delay_enter} ), ?force_ordering( #{?snk_kind := delay}, #{ ?snk_kind := handle_async_reply_enter, batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] } ), TimeoutMS = 200, ?assertEqual( ok, emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) ), ?assertEqual( ok, emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS}) ), Pid0 = spawn_link(fun() -> ?tp(delay_enter, #{}), ct:sleep(2 * TimeoutMS), ?tp(delay, #{}), ok end), {ok, _} = ?block_until( #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS ), unlink(Pid0), exit(Pid0, kill), ok end, fun(Trace) -> ?assertMatch( [ #{ expired := [ {query, _, {inc_counter, 199}, _, _}, {query, _, {inc_counter, 299}, _, _} ] } ], ?of_kind(handle_async_reply_expired, Trace) ), Metrics = tap_metrics(?LINE), ?assertMatch( #{ counters := #{ matched := 2, success := 0, dropped := 0, late_reply := 2, retried := 0, failed := 0 }, gauges := #{ inflight := 0, queuing := 0 } }, Metrics ), ok end ), ok. t_expiration_retry(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => sync, batch_size => 1, worker_pool_size => 1, resume_interval => 300 } ), do_t_expiration_retry(). t_expiration_retry_batch(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => sync, batch_size => 2, batch_time => 100, worker_pool_size => 1, resume_interval => 300 } ), do_t_expiration_retry(). do_t_expiration_retry() -> ResumeInterval = 300, ?check_trace( begin ok = emqx_resource:simple_sync_query(?ID, block), {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := buffer_worker_flush_nack}), 1, 200 ), TimeoutMS = 100, %% the request that expires must be first, so it's the %% head of the inflight table (and retriable). {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := buffer_worker_appended_to_queue}), 1, ResumeInterval * 2 ), spawn_link(fun() -> ?assertMatch( {error, {resource_error, #{reason := timeout}}}, emqx_resource:query( ?ID, {inc_counter, 1}, #{timeout => TimeoutMS} ) ) end), Pid1 = spawn_link(fun() -> receive go -> ok end, ?assertEqual( ok, emqx_resource:query( ?ID, {inc_counter, 2}, #{timeout => infinity} ) ) end), {ok, _} = snabbkaffe:receive_events(SRef1), Pid1 ! go, {ok, _} = snabbkaffe:receive_events(SRef0), {ok, _} = ?block_until( #{?snk_kind := buffer_worker_retry_expired}, ResumeInterval * 10 ), {ok, {ok, _}} = ?wait_async_action( emqx_resource:simple_sync_query(?ID, resume), #{?snk_kind := buffer_worker_retry_inflight_succeeded}, ResumeInterval * 5 ), ok end, fun(Trace) -> ?assertMatch( [#{expired := [{query, _, {inc_counter, 1}, _, _}]}], ?of_kind(buffer_worker_retry_expired, Trace) ), Metrics = tap_metrics(?LINE), ?assertMatch( #{ gauges := #{ inflight := 0, queuing := 0 } }, Metrics ), ok end ), ok. t_expiration_retry_batch_multiple_times(_Config) -> ResumeInterval = 300, emqx_connector_demo:set_callback_mode(always_sync), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => sync, batch_size => 2, batch_time => 100, worker_pool_size => 1, resume_interval => ResumeInterval } ), ?check_trace( begin ok = emqx_resource:simple_sync_query(?ID, block), {ok, SRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := buffer_worker_flush_nack}), 1, 200 ), TimeoutMS = 100, spawn_link(fun() -> ?assertMatch( {error, {resource_error, #{reason := timeout}}}, emqx_resource:query( ?ID, {inc_counter, 1}, #{timeout => TimeoutMS} ) ) end), spawn_link(fun() -> ?assertMatch( {error, {resource_error, #{reason := timeout}}}, emqx_resource:query( ?ID, {inc_counter, 2}, #{timeout => ResumeInterval + TimeoutMS} ) ) end), {ok, _} = snabbkaffe:receive_events(SRef), {ok, _} = snabbkaffe:block_until( ?match_n_events(2, #{?snk_kind := buffer_worker_retry_expired}), ResumeInterval * 10 ), ok end, fun(Trace) -> ?assertMatch( [ #{expired := [{query, _, {inc_counter, 1}, _, _}]}, #{expired := [{query, _, {inc_counter, 2}, _, _}]} ], ?of_kind(buffer_worker_retry_expired, Trace) ), ok end ), ok. t_batch_individual_reply_sync(_Config) -> ResumeInterval = 300, emqx_connector_demo:set_callback_mode(always_sync), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => sync, batch_size => 5, batch_time => 100, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => ResumeInterval } ), do_t_batch_individual_reply(). t_batch_individual_reply_async(_Config) -> ResumeInterval = 300, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => sync, batch_size => 5, batch_time => 100, worker_pool_size => 1, metrics_flush_interval => 50, resume_interval => ResumeInterval } ), on_exit(fun() -> emqx_resource:remove_local(?ID) end), do_t_batch_individual_reply(). do_t_batch_individual_reply() -> ?check_trace( begin {Results, {ok, _}} = ?wait_async_action( emqx_utils:pmap( fun(N) -> emqx_resource:query(?ID, {individual_reply, N rem 2 =:= 0}) end, lists:seq(1, 5) ), #{?snk_kind := buffer_worker_flush_ack, batch_or_query := [_, _ | _]}, 5_000 ), Ok = ok, Error = {error, {unrecoverable_error, bad_request}}, ?assertEqual([Error, Ok, Error, Ok, Error], Results), ?retry( 200, 10, ?assertMatch( #{ counters := #{ matched := 5, failed := 3, success := 2 } }, tap_metrics(?LINE) ) ), ok end, [] ), ok. t_recursive_flush(_Config) -> emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 1, worker_pool_size => 1 } ), do_t_recursive_flush(). t_recursive_flush_batch(_Config) -> emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ query_mode => async, batch_size => 2, batch_time => 10_000, worker_pool_size => 1 } ), do_t_recursive_flush(). do_t_recursive_flush() -> ?check_trace( begin Timeout = 1_000, Pid = spawn_link(fun S() -> emqx_resource:query(?ID, {inc_counter, 1}), S() end), %% we want two reflushes to happen before we analyze the %% trace, so that we get a single full interaction {ok, _} = snabbkaffe:block_until( ?match_n_events(2, #{?snk_kind := buffer_worker_flush_ack_reflush}), Timeout ), unlink(Pid), exit(Pid, kill), ok end, fun(Trace) -> %% check that a recursive flush leads to a new call to flush/1 Pairs = ?find_pairs( #{?snk_kind := buffer_worker_flush_ack_reflush}, #{?snk_kind := buffer_worker_flush}, Trace ), ?assert(lists:any(fun(E) -> E end, [true || {pair, _, _} <- Pairs])) end ), ok. t_call_mode_uncoupled_from_query_mode(_Config) -> DefaultOpts = #{ batch_size => 1, batch_time => 5, worker_pool_size => 1 }, ?check_trace( begin %% We check that we can call the buffer workers with async %% calls, even if the underlying connector itself only %% supports sync calls. emqx_connector_demo:set_callback_mode(always_sync), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, DefaultOpts#{query_mode => async} ), ?tp_span( async_query_sync_driver, #{}, ?assertMatch( {ok, {ok, _}}, ?wait_async_action( emqx_resource:query(?ID, {inc_counter, 1}), #{?snk_kind := buffer_worker_flush_ack}, 500 ) ) ), ?assertEqual(ok, emqx_resource:remove_local(?ID)), %% And we check the converse: a connector that allows async %% calls can be called synchronously, but the underlying %% call should be async. emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, DefaultOpts#{query_mode => sync} ), ?tp_span( sync_query_async_driver, #{}, ?assertEqual(ok, emqx_resource:query(?ID, {inc_counter, 2})) ), ?assertEqual(ok, emqx_resource:remove_local(?ID)), ?tp(sync_query_async_driver, #{}), ok end, fun(Trace0) -> Trace1 = trace_between_span(Trace0, async_query_sync_driver), ct:pal("async query calling sync driver\n ~p", [Trace1]), ?assert( ?strict_causality( #{?snk_kind := async_query, request := {inc_counter, 1}}, #{?snk_kind := call_query, call_mode := sync}, Trace1 ) ), Trace2 = trace_between_span(Trace0, sync_query_async_driver), ct:pal("sync query calling async driver\n ~p", [Trace2]), ?assert( ?strict_causality( #{?snk_kind := sync_query, request := {inc_counter, 2}}, #{?snk_kind := call_query_async}, Trace2 ) ), ok end ). %% The default mode is currently `memory_only'. t_volatile_offload_mode(_Config) -> MaxBufferBytes = 1_000, DefaultOpts = #{ max_buffer_bytes => MaxBufferBytes, worker_pool_size => 1 }, ?check_trace( begin emqx_connector_demo:set_callback_mode(async_if_possible), %% Create without any specified segment bytes; should %% default to equal max bytes. ?assertMatch( {ok, _}, create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, DefaultOpts#{buffer_mode => volatile_offload} ) ), ?assertEqual(ok, emqx_resource:remove_local(?ID)), %% Create with segment bytes < max bytes ?assertMatch( {ok, _}, create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, DefaultOpts#{ buffer_mode => volatile_offload, buffer_seg_bytes => MaxBufferBytes div 2 } ) ), ?assertEqual(ok, emqx_resource:remove_local(?ID)), %% Create with segment bytes = max bytes ?assertMatch( {ok, _}, create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, DefaultOpts#{ buffer_mode => volatile_offload, buffer_seg_bytes => MaxBufferBytes } ) ), ?assertEqual(ok, emqx_resource:remove_local(?ID)), %% Create with segment bytes > max bytes; should normalize %% to max bytes. ?assertMatch( {ok, _}, create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, DefaultOpts#{ buffer_mode => volatile_offload, buffer_seg_bytes => 2 * MaxBufferBytes } ) ), ?assertEqual(ok, emqx_resource:remove_local(?ID)), ok end, fun(Trace) -> HalfMaxBufferBytes = MaxBufferBytes div 2, ?assertMatch( [ #{ dir := _, max_total_bytes := MaxTotalBytes, seg_bytes := MaxTotalBytes, offload := {true, volatile} }, #{ dir := _, max_total_bytes := MaxTotalBytes, %% uses the specified value since it's smaller %% than max bytes. seg_bytes := HalfMaxBufferBytes, offload := {true, volatile} }, #{ dir := _, max_total_bytes := MaxTotalBytes, seg_bytes := MaxTotalBytes, offload := {true, volatile} }, #{ dir := _, max_total_bytes := MaxTotalBytes, seg_bytes := MaxTotalBytes, offload := {true, volatile} } ], ?projection(queue_opts, ?of_kind(buffer_worker_init, Trace)) ), ok end ). t_late_call_reply(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), RequestTTL = 500, ?assertMatch( {ok, _}, create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, #{ buffer_mode => memory_only, request_ttl => RequestTTL, query_mode => sync } ) ), ?check_trace( begin %% Sleep for longer than the request timeout; the call reply will %% have been already returned (a timeout), but the resource will %% still send a message with the reply. %% The demo connector will reply with `{error, timeout}' after 1 s. SleepFor = RequestTTL + 500, ?assertMatch( {error, {resource_error, #{reason := timeout}}}, emqx_resource:query( ?ID, {sync_sleep_before_reply, SleepFor}, #{timeout => RequestTTL} ) ), %% Our process shouldn't receive any late messages. receive LateReply -> ct:fail("received late reply: ~p", [LateReply]) after SleepFor -> ok end, ok end, [] ), ok. t_resource_create_error_activate_alarm_once(_) -> do_t_resource_activate_alarm_once( #{name => test_resource, create_error => true}, connector_demo_start_error ). t_resource_health_check_error_activate_alarm_once(_) -> do_t_resource_activate_alarm_once( #{name => test_resource, health_check_error => true}, connector_demo_health_check_error ). do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) -> ?check_trace( begin ?wait_async_action( create( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, ResourceConfig, #{health_check_interval => 100} ), #{?snk_kind := resource_activate_alarm, resource_id := ?ID} ), ?assertMatch([#{activated := true, name := ?ID}], emqx_alarm:get_alarms(activated)), {ok, SubRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := SubscribeEvent}), 4, 7000 ), ?assertMatch({ok, [_, _, _, _]}, snabbkaffe:receive_events(SubRef)) end, fun(Trace) -> ?assertMatch([_], ?of_kind(resource_activate_alarm, Trace)) end ). t_telemetry_handler_crash(_Config) -> %% Check that a crash while handling a telemetry event, such as when a busy resource %% is restarted and its metrics are not recreated while handling an increment, does %% not lead to the handler being uninstalled. ?check_trace( begin NonExistentId = <<"I-dont-exist">>, WorkerId = 1, HandlersBefore = telemetry:list_handlers([?TELEMETRY_PREFIX]), ?assertMatch([_ | _], HandlersBefore), lists:foreach(fun(Fn) -> Fn(NonExistentId) end, counter_metric_inc_fns()), emqx_common_test_helpers:with_mock( emqx_metrics_worker, set_gauge, fun(_Name, _Id, _WorkerId, _Metric, _Val) -> error(random_crash) end, fun() -> lists:foreach( fun(Fn) -> Fn(NonExistentId, WorkerId, 1) end, gauge_metric_set_fns() ) end ), ?assertEqual(HandlersBefore, telemetry:list_handlers([?TELEMETRY_PREFIX])), ok end, [] ), ok. %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ inc_counter_in_parallel(N) -> inc_counter_in_parallel(N, {inc_counter, 1}, #{}). inc_counter_in_parallel(N, Opts0) -> inc_counter_in_parallel(N, {inc_counter, 1}, Opts0). inc_counter_in_parallel(N, Query, Opts) -> Parent = self(), Pids = [ erlang:spawn(fun() -> emqx_resource:query(?ID, maybe_apply(Query), maybe_apply(Opts)), Parent ! {complete, self()} end) || _ <- lists:seq(1, N) ], [ receive {complete, Pid} -> ok after 1000 -> ct:fail({wait_for_query_timeout, Pid}) end || Pid <- Pids ], ok. inc_counter_in_parallel_increasing(N, StartN, Opts) -> Parent = self(), Pids = [ erlang:spawn(fun() -> emqx_resource:query(?ID, {inc_counter, M}, maybe_apply(Opts)), Parent ! {complete, self()} end) || M <- lists:seq(StartN, StartN + N - 1) ], [ receive {complete, Pid} -> ok after 1000 -> ct:fail({wait_for_query_timeout, Pid}) end || Pid <- Pids ]. maybe_apply(FunOrTerm) -> maybe_apply(FunOrTerm, []). maybe_apply(Fun, Args) when is_function(Fun) -> erlang:apply(Fun, Args); maybe_apply(Term, _Args) -> Term. bin_config() -> <<"\"name\": \"test_resource\"">>. config() -> {ok, Config} = hocon:binary(bin_config()), Config. tap_metrics(Line) -> #{counters := C, gauges := G} = emqx_resource:get_metrics(?ID), ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]), #{counters => C, gauges => G}. install_telemetry_handler(TestCase) -> Tid = ets:new(TestCase, [ordered_set, public]), HandlerId = TestCase, TestPid = self(), _ = telemetry:attach_many( HandlerId, emqx_resource_metrics:events(), fun(EventName, Measurements, Metadata, _Config) -> Data = #{ name => EventName, measurements => Measurements, metadata => Metadata }, ets:insert(Tid, {erlang:monotonic_time(), Data}), TestPid ! {telemetry, Data}, ok end, unused_config ), on_exit(fun() -> telemetry:detach(HandlerId), ets:delete(Tid) end), put({?MODULE, telemetry_table}, Tid), Tid. wait_until_gauge_is( GaugeName, #{ expected_value := ExpectedValue, timeout := Timeout, max_events := MaxEvents } ) -> Events = receive_all_events(GaugeName, Timeout, MaxEvents), case length(Events) > 0 andalso lists:last(Events) of #{measurements := #{gauge_set := ExpectedValue}} -> ok; #{measurements := #{gauge_set := Value}} -> ct:fail( "gauge ~p didn't reach expected value ~p; last value: ~p", [GaugeName, ExpectedValue, Value] ); false -> ct:pal("no ~p gauge events received!", [GaugeName]) end. receive_all_events(EventName, Timeout) -> receive_all_events(EventName, Timeout, _MaxEvents = 50, _Count = 0, _Acc = []). receive_all_events(EventName, Timeout, MaxEvents) -> receive_all_events(EventName, Timeout, MaxEvents, _Count = 0, _Acc = []). receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents -> lists:reverse(Acc); receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) -> receive {telemetry, #{name := [_, _, EventName]} = Event} -> ct:pal("telemetry event: ~p", [Event]), receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc]) after Timeout -> lists:reverse(Acc) end. wait_telemetry_event(EventName) -> wait_telemetry_event(EventName, #{timeout => 5_000, n_events => 1}). wait_telemetry_event( EventName, Opts0 ) -> DefaultOpts = #{timeout => 5_000, n_events => 1}, #{timeout := Timeout, n_events := NEvents} = maps:merge(DefaultOpts, Opts0), wait_n_events(NEvents, Timeout, EventName). wait_n_events(NEvents, _Timeout, _EventName) when NEvents =< 0 -> ok; wait_n_events(NEvents, Timeout, EventName) -> TelemetryTable = get({?MODULE, telemetry_table}), receive {telemetry, #{name := [_, _, EventName]}} -> wait_n_events(NEvents - 1, Timeout, EventName) after Timeout -> RecordedEvents = ets:tab2list(TelemetryTable), ct:pal("recorded events: ~p", [RecordedEvents]), error({timeout_waiting_for_telemetry, EventName}) end. assert_sync_retry_fail_then_succeed_inflight(Trace) -> ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( #{?snk_kind := buffer_worker_flush_nack, ref := _Ref}, #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, Trace ) ), %% not strict causality because it might retry more than once %% before restoring the resource health. ?assert( ?causality( #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, #{?snk_kind := buffer_worker_retry_inflight_succeeded, ref := _Ref}, Trace ) ), ok. assert_async_retry_fail_then_succeed_inflight(Trace) -> ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( #{?snk_kind := handle_async_reply, action := nack}, #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, Trace ) ), %% not strict causality because it might retry more than once %% before restoring the resource health. ?assert( ?causality( #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, #{?snk_kind := buffer_worker_retry_inflight_succeeded, ref := _Ref}, Trace ) ), ok. trace_between_span(Trace0, Marker) -> {Trace1, [_ | _]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := {complete, _}}, Trace0), {[_ | _], [_ | Trace2]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := start}, Trace1), Trace2. wait_until_all_marked_as_retriable(NumExpected) when NumExpected =< 0 -> ok; wait_until_all_marked_as_retriable(NumExpected) -> Seen = #{}, do_wait_until_all_marked_as_retriable(NumExpected, Seen). do_wait_until_all_marked_as_retriable(NumExpected, _Seen) when NumExpected =< 0 -> ok; do_wait_until_all_marked_as_retriable(NumExpected, Seen) -> Res = ?block_until( #{?snk_kind := buffer_worker_async_agent_down, ?snk_meta := #{pid := P}} when not is_map_key(P, Seen), 10_000 ), case Res of {timeout, Evts} -> ct:pal("events so far:\n ~p", [Evts]), ct:fail("timeout waiting for events"); {ok, #{num_affected := NumAffected, ?snk_meta := #{pid := Pid}}} -> ct:pal("affected: ~p; pid: ~p", [NumAffected, Pid]), case NumAffected >= NumExpected of true -> ok; false -> do_wait_until_all_marked_as_retriable(NumExpected - NumAffected, Seen#{ Pid => true }) end end. counter_metric_inc_fns() -> Mod = emqx_resource_metrics, [ fun Mod:Fn/1 || {Fn, 1} <- Mod:module_info(functions), case string:find(atom_to_list(Fn), "_inc", trailing) of "_inc" -> true; _ -> false end ]. gauge_metric_set_fns() -> Mod = emqx_resource_metrics, [ fun Mod:Fn/3 || {Fn, 3} <- Mod:module_info(functions), case string:find(atom_to_list(Fn), "_set", trailing) of "_set" -> true; _ -> false end ]. create(Id, Group, Type, Config) -> emqx_resource:create_local(Id, Group, Type, Config). create(Id, Group, Type, Config, Opts) -> emqx_resource:create_local(Id, Group, Type, Config, Opts).