emqx/apps/emqx_resource/test/emqx_resource_SUITE.erl

3376 lines
108 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_resource_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(TEST_RESOURCE, emqx_connector_demo).
-define(ID, <<"id">>).
-define(ID1, <<"id1">>).
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
-define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
-define(TELEMETRY_PREFIX, emqx, resource).
-import(emqx_common_test_helpers, [on_exit/1]).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
init_per_testcase(_, Config) ->
ct:timetrap({seconds, 30}),
emqx_connector_demo:set_callback_mode(always_sync),
snabbkaffe:start_trace(),
Config.
end_per_testcase(_, _Config) ->
snabbkaffe:stop(),
_ = emqx_resource:remove_local(?ID),
emqx_common_test_helpers:call_janitor(),
ok.
init_per_suite(Config) ->
code:ensure_loaded(?TEST_RESOURCE),
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
{ok, _} = application:ensure_all_started(emqx_resource),
Config.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_resource, emqx_conf]).
%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------
t_list_types(_) ->
?assert(lists:member(?TEST_RESOURCE, emqx_resource:list_types())).
t_check_config(_) ->
{ok, #{}} = emqx_resource:check_config(?TEST_RESOURCE, bin_config()),
{ok, #{}} = emqx_resource:check_config(?TEST_RESOURCE, config()),
{error, _} = emqx_resource:check_config(?TEST_RESOURCE, <<"not a config">>),
{error, _} = emqx_resource:check_config(?TEST_RESOURCE, #{invalid => config}).
t_create_remove(_) ->
?check_trace(
begin
?assertMatch(
{error, _},
emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}
)
),
?assertMatch(
{ok, _},
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}
)
),
?assertMatch(
{ok, _},
emqx_resource:recreate_local(
?ID,
?TEST_RESOURCE,
#{name => test_resource},
#{}
)
),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid)),
?assertEqual(ok, emqx_resource:remove_local(?ID)),
?assertMatch(ok, emqx_resource:remove_local(?ID)),
?assertNot(is_process_alive(Pid))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_create_remove_local(_) ->
?check_trace(
begin
?assertMatch(
{error, _},
emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}
)
),
?assertMatch(
{ok, _},
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}
)
),
emqx_resource:recreate_local(
?ID,
?TEST_RESOURCE,
#{name => test_resource},
#{}
),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid)),
emqx_resource:set_resource_status_connecting(?ID),
emqx_resource:recreate_local(
?ID,
?TEST_RESOURCE,
#{name => test_resource},
#{}
),
?assertEqual(ok, emqx_resource:remove_local(?ID)),
?assertMatch(ok, emqx_resource:remove_local(?ID)),
?assertMatch(
{error, not_found},
emqx_resource:query(?ID, get_state)
),
?assertNot(is_process_alive(Pid))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_do_not_start_after_created(_) ->
?check_trace(
begin
?assertMatch(
{ok, _},
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{start_after_created => false}
)
),
%% the resource should remain `disconnected` after created
timer:sleep(200),
?assertMatch(
?RESOURCE_ERROR(stopped),
emqx_resource:query(?ID, get_state)
),
?assertMatch(
{ok, _, #{status := stopped}},
emqx_resource:get_instance(?ID)
),
%% start the resource manually..
?assertEqual(ok, emqx_resource:start(?ID)),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid)),
%% restart the resource
?assertEqual(ok, emqx_resource:restart(?ID)),
?assertNot(is_process_alive(Pid)),
{ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid2)),
?assertEqual(ok, emqx_resource:remove_local(?ID)),
?assertNot(is_process_alive(Pid2))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_query(_) ->
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}
),
{ok, #{pid := _}} = emqx_resource:query(?ID, get_state),
?assertMatch(
{error, not_found},
emqx_resource:query(<<"unknown">>, get_state)
),
ok = emqx_resource:remove_local(?ID).
t_query_counter(_) ->
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true}
),
{ok, 0} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:query(?ID, {inc_counter, 1}),
{ok, 1} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:query(?ID, {inc_counter, 5}),
{ok, 6} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:remove_local(?ID).
t_batch_query_counter(_) ->
BatchSize = 100,
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true},
#{
batch_size => BatchSize,
batch_time => 100,
query_mode => sync
}
),
?check_trace(
?TRACE_OPTS,
emqx_resource:query(?ID, get_counter),
fun(Result, Trace) ->
?assertMatch({ok, 0}, Result),
QueryTrace = ?of_kind(call_batch_query, Trace),
?assertMatch([#{batch := [{query, _, get_counter, _, _}]}], QueryTrace)
end
),
NMsgs = 1_000,
?check_trace(
?TRACE_OPTS,
begin
NEvents = round(math:ceil(NMsgs / BatchSize)),
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter}),
NEvents,
_Timeout = 10_000
),
inc_counter_in_parallel(NMsgs),
{ok, _} = snabbkaffe:receive_events(SRef),
ok
end,
fun(Trace) ->
QueryTrace = [
Event
|| Event = #{
?snk_kind := call_batch_query,
batch := BatchReq
} <- Trace,
length(BatchReq) > 1
],
?assertMatch([_ | _], QueryTrace)
end
),
{ok, NMsgs} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:remove_local(?ID).
t_query_counter_async_query(_) ->
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true},
#{
query_mode => async,
batch_size => 1,
metrics_flush_interval => 50
}
),
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
NMsgs = 1_000,
?check_trace(
?TRACE_OPTS,
begin
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter}),
NMsgs,
_Timeout = 60_000
),
inc_counter_in_parallel(NMsgs),
{ok, _} = snabbkaffe:receive_events(SRef),
ok
end,
fun(Trace) ->
%% the callback_mode of 'emqx_connector_demo' is 'always_sync'.
QueryTrace = ?of_kind(call_query, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
end
),
%% simple query ignores the query_mode and batching settings in the resource_worker
?check_trace(
?TRACE_OPTS,
emqx_resource:simple_sync_query(?ID, get_counter),
fun(Result, Trace) ->
?assertMatch({ok, 1000}, Result),
%% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
QueryTrace = ?of_kind(call_query, Trace),
?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace)
end
),
#{counters := C} = emqx_resource:get_metrics(?ID),
?retry(
_Sleep = 300,
_Attempts0 = 20,
?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C)
),
ok = emqx_resource:remove_local(?ID).
t_query_counter_async_callback(_) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
Insert = fun(Tab, Result) ->
ets:insert(Tab, {make_ref(), Result})
end,
ReqOpts = #{async_reply_fun => {Insert, [Tab0]}},
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true},
#{
query_mode => async,
batch_size => 1,
inflight_window => 1000000
}
),
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
NMsgs = 1_000,
?check_trace(
?TRACE_OPTS,
begin
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
NMsgs,
_Timeout = 60_000
),
inc_counter_in_parallel(NMsgs, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef),
ok
end,
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
end
),
%% simple query ignores the query_mode and batching settings in the resource_worker
?check_trace(
?TRACE_OPTS,
emqx_resource:simple_sync_query(?ID, get_counter),
fun(Result, Trace) ->
?assertMatch({ok, 1000}, Result),
QueryTrace = ?of_kind(call_query, Trace),
?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace)
end
),
#{counters := C} = emqx_resource:get_metrics(?ID),
?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C),
?assertMatch(1000, ets:info(Tab0, size)),
?assert(
lists:all(
fun
({_, ok}) -> true;
(_) -> false
end,
ets:tab2list(Tab0)
)
),
ok = emqx_resource:remove_local(?ID).
t_query_counter_async_inflight(_) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
MetricsTab = ets:new(metrics_tab, [ordered_set, public]),
ok = telemetry:attach_many(
?FUNCTION_NAME,
emqx_resource_metrics:events(),
fun(Event, Measurements, Meta, _Config) ->
ets:insert(
MetricsTab,
{erlang:monotonic_time(), #{
event => Event, measurements => Measurements, metadata => Meta
}}
),
ok
end,
unused_config
),
on_exit(fun() -> telemetry:detach(?FUNCTION_NAME) end),
Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
Insert0 = fun(Tab, Ref, Result) ->
ct:pal("inserting ~p", [{Ref, Result}]),
ets:insert(Tab, {Ref, Result})
end,
ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
WindowSize = 15,
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true},
#{
query_mode => async,
batch_size => 1,
inflight_window => WindowSize,
worker_pool_size => 1,
resume_interval => 300
}
),
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
%% block the resource
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
%% send async query to make the inflight window full
?check_trace(
{_, {ok, _}} =
?wait_async_action(
%% one more so that inflight would be already full upon last query
inc_counter_in_parallel(WindowSize + 1, ReqOpts),
#{?snk_kind := buffer_worker_flush_but_inflight_full},
1_000
),
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
end
),
tap_metrics(?LINE),
?assertMatch(0, ets:info(Tab0, size)),
tap_metrics(?LINE),
%% send query now will fail because the resource is blocked.
Insert = fun(Tab, Ref, Result) ->
ct:pal("inserting ~p", [{Ref, Result}]),
ets:insert(Tab, {Ref, Result}),
?tp(tmp_query_inserted, #{})
end,
%% since this counts as a failure, it'll be enqueued and retried
%% later, when the resource is unblocked.
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 99}, #{
async_reply_fun => {Insert, [Tab0, tmp_query]}
}),
#{?snk_kind := buffer_worker_appended_to_queue},
1_000
),
tap_metrics(?LINE),
%% all responses should be received after the resource is resumed.
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
%% +2 because the tmp_query above will be retried and succeed
%% this time.
WindowSize + 2,
_Timeout0 = 10_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
tap_metrics(?LINE),
{ok, _} = snabbkaffe:receive_events(SRef0),
tap_metrics(?LINE),
%% since the previous tmp_query was enqueued to be retried, we
%% take it again from the table; this time, it should have
%% succeeded.
?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
%% send async query, this time everything should be ok.
Num = 10,
?check_trace(
begin
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
Num,
_Timeout0 = 10_000
),
inc_counter_in_parallel_increasing(Num, 1, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef),
ok
end,
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace),
?assertEqual(WindowSize + Num + 1, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
tap_metrics(?LINE),
ok
end
),
%% block the resource
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
%% again, send async query to make the inflight window full
?check_trace(
{_, {ok, _}} =
?wait_async_action(
%% one more so that inflight would be already full upon last query
inc_counter_in_parallel(WindowSize + 1, ReqOpts),
#{?snk_kind := buffer_worker_flush_but_inflight_full},
1_000
),
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
end
),
%% this will block the resource_worker
ok = emqx_resource:query(?ID, {inc_counter, 4}),
Sent = WindowSize + 1 + Num + WindowSize + 1,
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
WindowSize + 1,
_Timeout0 = 10_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
{ok, _} = snabbkaffe:receive_events(SRef1),
?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
tap_metrics(?LINE),
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
?assert(Sent =< Counter),
%% give the metrics some time to stabilize.
ct:sleep(1000),
#{counters := C, gauges := G} = tap_metrics(?LINE),
?assertMatch(
#{
counters :=
#{matched := M, success := Ss, dropped := Dp},
gauges := #{queuing := Qing, inflight := Infl}
} when
M == Ss + Dp + Qing + Infl,
#{counters => C, gauges => G},
#{
metrics => #{counters => C, gauges => G},
results => ets:tab2list(Tab0),
metrics_trace => ets:tab2list(MetricsTab)
}
),
?assert(
lists:all(
fun
({_, ok}) -> true;
(_) -> false
end,
ets:tab2list(Tab0)
)
),
ok = emqx_resource:remove_local(?ID).
t_query_counter_async_inflight_batch(_) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
MetricsTab = ets:new(metrics_tab, [ordered_set, public]),
ok = telemetry:attach_many(
?FUNCTION_NAME,
emqx_resource_metrics:events(),
fun(Event, Measurements, Meta, _Config) ->
ets:insert(
MetricsTab,
{erlang:monotonic_time(), #{
event => Event, measurements => Measurements, metadata => Meta
}}
),
ok
end,
unused_config
),
on_exit(fun() -> telemetry:detach(?FUNCTION_NAME) end),
Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
Insert0 = fun(Tab, Ref, Result) ->
ct:pal("inserting ~p", [{Ref, Result}]),
ets:insert(Tab, {Ref, Result})
end,
ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
BatchSize = 2,
WindowSize = 15,
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true},
#{
query_mode => async,
batch_size => BatchSize,
batch_time => 100,
inflight_window => WindowSize,
worker_pool_size => 1,
resume_interval => 300
}
),
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
%% block the resource
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
%% send async query to make the inflight window full
NumMsgs = BatchSize * WindowSize,
?check_trace(
{_, {ok, _}} =
?wait_async_action(
%% a batch more so that inflight would be already full upon last query
inc_counter_in_parallel(NumMsgs + BatchSize, ReqOpts),
#{?snk_kind := buffer_worker_flush_but_inflight_full},
5_000
),
fun(Trace) ->
QueryTrace = [
Event
|| Event = #{
?snk_kind := call_batch_query_async,
batch := [
{query, _, {inc_counter, 1}, _, _},
{query, _, {inc_counter, 1}, _, _}
]
} <-
Trace
],
?assertMatch([_ | _], QueryTrace)
end
),
tap_metrics(?LINE),
Sent1 = NumMsgs + BatchSize,
?check_trace(
begin
%% this will block the resource_worker as the inflight window is full now
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 2}, ReqOpts()),
#{?snk_kind := buffer_worker_flush_but_inflight_full},
5_000
),
?assertMatch(0, ets:info(Tab0, size)),
ok
end,
[]
),
Sent2 = Sent1 + 1,
tap_metrics(?LINE),
%% send query now will fail because the resource is blocked.
Insert = fun(Tab, Ref, Result) ->
ct:pal("inserting ~p", [{Ref, Result}]),
ets:insert(Tab, {Ref, Result}),
?tp(tmp_query_inserted, #{})
end,
%% since this counts as a failure, it'll be enqueued and retried
%% later, when the resource is unblocked.
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 3}, #{
async_reply_fun => {Insert, [Tab0, tmp_query]}
}),
#{?snk_kind := buffer_worker_appended_to_queue},
1_000
),
tap_metrics(?LINE),
%% all responses should be received after the resource is resumed.
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
%% +2 because the tmp_query above will be retried and succeed
%% this time.
WindowSize + 2,
5_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
tap_metrics(?LINE),
{ok, _} = snabbkaffe:receive_events(SRef0),
%% since the previous tmp_query was enqueued to be retried, we
%% take it again from the table; this time, it should have
%% succeeded.
?assertEqual([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
?assertEqual(Sent2, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
tap_metrics(?LINE),
%% send async query, this time everything should be ok.
NumBatches1 = 3,
NumMsgs1 = BatchSize * NumBatches1,
?check_trace(
?TRACE_OPTS,
begin
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
NumBatches1,
5_000
),
inc_counter_in_parallel(NumMsgs1, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef),
ok
end,
fun(Trace) ->
QueryTrace = ?of_kind(call_batch_query_async, Trace),
?assertMatch(
[#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _],
QueryTrace
)
end
),
Sent3 = Sent2 + NumMsgs1,
?assertEqual(Sent3, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
tap_metrics(?LINE),
%% block the resource
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
%% again, send async query to make the inflight window full
?check_trace(
{_, {ok, _}} =
?wait_async_action(
%% a batch more so that inflight would be already full upon last query
inc_counter_in_parallel(NumMsgs + BatchSize, ReqOpts),
#{?snk_kind := buffer_worker_flush_but_inflight_full},
5_000
),
fun(Trace) ->
QueryTrace = ?of_kind(call_batch_query_async, Trace),
?assertMatch(
[#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _],
QueryTrace
)
end
),
Sent4 = Sent3 + NumMsgs + BatchSize,
%% this will block the resource_worker
ok = emqx_resource:query(?ID, {inc_counter, 1}),
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
WindowSize + 1,
5_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
{ok, _} = snabbkaffe:receive_events(SRef1),
?assertEqual(Sent4, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent4]),
?assert(Sent4 =< Counter),
%% give the metrics some time to stabilize.
ct:sleep(1000),
#{counters := C, gauges := G} = tap_metrics(?LINE),
?assertMatch(
#{
counters :=
#{matched := M, success := Ss, dropped := Dp},
gauges := #{queuing := Qing, inflight := Infl}
} when
M == Ss + Dp + Qing + Infl,
#{counters => C, gauges => G},
#{
metrics => #{counters => C, gauges => G},
results => ets:tab2list(Tab0),
metrics_trace => ets:tab2list(MetricsTab)
}
),
?assert(
lists:all(
fun
({_, ok}) -> true;
(_) -> false
end,
ets:tab2list(Tab0)
)
),
ok = emqx_resource:remove_local(?ID).
t_healthy_timeout(_) ->
?check_trace(
begin
?assertMatch(
{ok, _},
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => <<"bad_not_atom_name">>, register => true},
%% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
#{health_check_interval => 200}
)
),
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
emqx_resource:query(?ID, get_state, #{timeout => 1_000})
),
?assertMatch(
{ok, _Group, #{status := disconnected}}, emqx_resource_manager:lookup(?ID)
),
?assertEqual(ok, emqx_resource:remove_local(?ID))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_healthy(_) ->
?check_trace(
begin
?assertMatch(
{ok, _},
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}
)
),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
timer:sleep(300),
emqx_resource:set_resource_status_connecting(?ID),
?assertEqual({ok, connected}, emqx_resource:health_check(?ID)),
?assertMatch(
[#{status := connected}],
emqx_resource:list_instances_verbose()
),
erlang:exit(Pid, shutdown),
?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)),
?assertMatch(
[#{status := disconnected}],
emqx_resource:list_instances_verbose()
),
?assertEqual(ok, emqx_resource:remove_local(?ID))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_unhealthy_target(_) ->
HealthCheckError = {unhealthy_target, "some message"},
?assertMatch(
{ok, _},
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, health_check_error => {msg, HealthCheckError}}
)
),
?assertEqual(
{ok, disconnected},
emqx_resource:health_check(?ID)
),
?assertMatch(
{ok, _Group, #{error := HealthCheckError}},
emqx_resource_manager:lookup(?ID)
),
%% messages are dropped when bridge is unhealthy
lists:foreach(
fun(_) ->
?assertMatch(
{error, {resource_error, #{reason := unhealthy_target}}},
emqx_resource:query(?ID, message)
)
end,
lists:seq(1, 3)
),
?assertEqual(3, emqx_resource_metrics:matched_get(?ID)),
?assertEqual(3, emqx_resource_metrics:dropped_resource_stopped_get(?ID)).
t_stop_start(_) ->
?check_trace(
begin
?assertMatch(
{error, _},
emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}
)
),
?assertMatch(
{ok, _},
emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{<<"name">> => <<"test_resource">>}
)
),
%% add some metrics to test their persistence
WorkerID0 = <<"worker:0">>,
WorkerID1 = <<"worker:1">>,
emqx_resource_metrics:inflight_set(?ID, WorkerID0, 2),
emqx_resource_metrics:inflight_set(?ID, WorkerID1, 3),
?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
?assertMatch(
{ok, _},
emqx_resource:check_and_recreate_local(
?ID,
?TEST_RESOURCE,
#{<<"name">> => <<"test_resource">>},
#{}
)
),
{ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid0)),
%% metrics are reset when recreating
%% depending on timing, might show the request we just did.
ct:sleep(500),
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
ok = emqx_resource:stop(?ID),
?assertNot(is_process_alive(Pid0)),
?assertMatch(
?RESOURCE_ERROR(stopped),
emqx_resource:query(?ID, get_state)
),
?assertEqual(ok, emqx_resource:restart(?ID)),
timer:sleep(300),
{ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid1)),
%% now stop while resetting the metrics
ct:sleep(500),
emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1),
emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4),
?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
?assertEqual(ok, emqx_resource:stop(?ID)),
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_stop_start_local(_) ->
?check_trace(
begin
?assertMatch(
{error, _},
emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}
)
),
?assertMatch(
{ok, _},
emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{<<"name">> => <<"test_resource">>}
)
),
?assertMatch(
{ok, _},
emqx_resource:check_and_recreate_local(
?ID,
?TEST_RESOURCE,
#{<<"name">> => <<"test_resource">>},
#{}
)
),
{ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid0)),
?assertEqual(ok, emqx_resource:stop(?ID)),
?assertNot(is_process_alive(Pid0)),
?assertMatch(
?RESOURCE_ERROR(stopped),
emqx_resource:query(?ID, get_state)
),
?assertEqual(ok, emqx_resource:restart(?ID)),
{ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid1))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_list_filter(_) ->
{ok, _} = create(
emqx_resource:generate_id(<<"a">>),
<<"group1">>,
?TEST_RESOURCE,
#{name => a}
),
{ok, _} = create(
emqx_resource:generate_id(<<"a">>),
<<"group2">>,
?TEST_RESOURCE,
#{name => grouped_a}
),
[Id1] = emqx_resource:list_group_instances(<<"group1">>),
?assertMatch(
{ok, <<"group1">>, #{config := #{name := a}}},
emqx_resource:get_instance(Id1)
),
[Id2] = emqx_resource:list_group_instances(<<"group2">>),
?assertMatch(
{ok, <<"group2">>, #{config := #{name := grouped_a}}},
emqx_resource:get_instance(Id2)
).
t_create_dry_run_local(_) ->
lists:foreach(
fun(_) ->
create_dry_run_local_succ()
end,
lists:seq(1, 10)
),
?retry(
100,
5,
?assertEqual(
[],
emqx_resource:list_instances_verbose()
)
).
create_dry_run_local_succ() ->
?assertEqual(
ok,
emqx_resource:create_dry_run_local(
?TEST_RESOURCE,
#{name => test_resource, register => true}
)
),
?assertEqual(undefined, whereis(test_resource)).
t_create_dry_run_local_failed(_) ->
ct:timetrap({seconds, 120}),
ct:pal("creating with creation error"),
Res1 = emqx_resource:create_dry_run_local(
?TEST_RESOURCE,
#{create_error => true}
),
?assertMatch({error, _}, Res1),
ct:pal("creating with health check error"),
Res2 = emqx_resource:create_dry_run_local(
?TEST_RESOURCE,
#{name => test_resource, health_check_error => true}
),
?assertMatch({error, _}, Res2),
ct:pal("creating with stop error"),
Res3 = emqx_resource:create_dry_run_local(
?TEST_RESOURCE,
#{name => test_resource, stop_error => true}
),
?assertEqual(ok, Res3),
?retry(
100,
5,
?assertEqual(
[],
emqx_resource:list_instances_verbose()
)
).
t_test_func(_) ->
IsErrorMsgPlainString = fun({error, Msg}) -> io_lib:printable_list(Msg) end,
?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),
?assertEqual(ok, erlang:apply(emqx_resource_validator:min(int, 3), [4])),
?assertEqual(ok, erlang:apply(emqx_resource_validator:max(array, 10), [[a, b, c, d]])),
?assertEqual(ok, erlang:apply(emqx_resource_validator:max(string, 10), ["less10"])),
?assertEqual(
true, IsErrorMsgPlainString(erlang:apply(emqx_resource_validator:min(int, 66), [42]))
),
?assertEqual(
true, IsErrorMsgPlainString(erlang:apply(emqx_resource_validator:max(int, 42), [66]))
),
?assertEqual(
true, IsErrorMsgPlainString(erlang:apply(emqx_resource_validator:min(array, 3), [[1, 2]]))
),
?assertEqual(
true,
IsErrorMsgPlainString(erlang:apply(emqx_resource_validator:max(array, 3), [[1, 2, 3, 4]]))
),
?assertEqual(
true, IsErrorMsgPlainString(erlang:apply(emqx_resource_validator:min(string, 3), ["1"]))
),
?assertEqual(
true, IsErrorMsgPlainString(erlang:apply(emqx_resource_validator:max(string, 3), ["1234"]))
),
NestedMsg = io_lib:format("The answer: ~p", [42]),
ExpectedMsg = "The answer: 42",
BinMsg = <<"The answer: 42">>,
MapMsg = #{question => "The question", answer => 42},
?assertEqual(
{error, ExpectedMsg},
erlang:apply(emqx_resource_validator:not_empty(NestedMsg), [""])
),
?assertEqual(
{error, ExpectedMsg},
erlang:apply(emqx_resource_validator:not_empty(NestedMsg), [<<>>])
),
?assertEqual(
{error, ExpectedMsg},
erlang:apply(emqx_resource_validator:not_empty(NestedMsg), [undefined])
),
?assertEqual(
{error, ExpectedMsg},
erlang:apply(emqx_resource_validator:not_empty(NestedMsg), [undefined])
),
?assertEqual(
{error, BinMsg},
erlang:apply(emqx_resource_validator:not_empty(BinMsg), [undefined])
),
?assertEqual(
{error, MapMsg},
erlang:apply(emqx_resource_validator:not_empty(MapMsg), [""])
).
t_reset_metrics(_) ->
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}
),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
emqx_resource:reset_metrics(?ID),
?assert(is_process_alive(Pid)),
ok = emqx_resource:remove_local(?ID),
?assertNot(is_process_alive(Pid)).
t_auto_retry(_) ->
{Res, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, create_error => true},
#{health_check_interval => 100}
),
?assertEqual(ok, Res).
%% tests resources that have an asynchronous start: they are created
%% without problems, but later some issue is found when calling the
%% health check.
t_start_throw_error(_Config) ->
Message = "something went wrong",
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, health_check_error => {msg, Message}},
#{health_check_interval => 100}
),
#{?snk_kind := connector_demo_health_check_error},
1_000
)
),
%% Now, if we try to "reconnect" (restart) it, we should get the error
?assertMatch({error, Message}, emqx_resource:start(?ID, _Opts = #{})),
ok.
t_health_check_disconnected(_) ->
?check_trace(
begin
_ = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, create_error => true},
#{health_check_interval => 100}
),
?assertEqual(
{ok, disconnected},
emqx_resource:health_check(?ID)
)
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_unblock_only_required_buffer_workers(_) ->
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 5,
metrics_flush_interval => 50,
batch_time => 100
}
),
lists:foreach(
fun emqx_resource_buffer_worker:block/1,
emqx_resource_buffer_worker_sup:worker_pids(?ID)
),
create(
?ID1,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 5,
batch_time => 100
}
),
%% creation of `?ID1` should not have unblocked `?ID`'s buffer workers
%% so we should see resumes now (`buffer_worker_enter_running`).
?check_trace(
?wait_async_action(
lists:foreach(
fun emqx_resource_buffer_worker:resume/1,
emqx_resource_buffer_worker_sup:worker_pids(?ID)
),
#{?snk_kind := buffer_worker_enter_running},
5000
),
fun(Trace) ->
?assertMatch(
[#{id := ?ID} | _],
?of_kind(buffer_worker_enter_running, Trace)
)
end
).
t_retry_batch(_Config) ->
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 5,
batch_time => 100,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => 1_000
}
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
Matched0 = emqx_resource_metrics:matched_get(?ID),
?assertEqual(1, Matched0),
%% these requests will batch together and fail; the buffer worker
%% will enter the `blocked' state and they'll be retried later,
%% after it unblocks.
Payloads = lists:seq(1, 5),
NumPayloads = length(Payloads),
ExpectedCount = 15,
?check_trace(
begin
{ok, {ok, _}} =
?wait_async_action(
lists:foreach(
fun(N) ->
ok = emqx_resource:query(?ID, {inc_counter, N})
end,
Payloads
),
#{?snk_kind := buffer_worker_enter_blocked},
5_000
),
%% now the individual messages should have been counted
Matched1 = emqx_resource_metrics:matched_get(?ID),
?assertEqual(Matched0 + NumPayloads, Matched1),
%% wait for two more retries while the failure is enabled; the
%% batch shall remain enqueued.
{ok, _} =
snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := buffer_worker_retry_inflight_failed}),
5_000
),
%% should not have increased the matched count with the retries
Matched2 = emqx_resource_metrics:matched_get(?ID),
?assertEqual(Matched1, Matched2),
%% now unblock the buffer worker so it may retry the batch,
%% but it'll still fail
{ok, {ok, _}} =
?wait_async_action(
ok = emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_retry_inflight_succeeded},
5_000
),
%% 1 more because of the `resume' call
Matched3 = emqx_resource_metrics:matched_get(?ID),
?assertEqual(Matched2 + 1, Matched3),
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
{Counter, Matched3}
end,
fun({Counter, Matched3}, Trace) ->
%% 1 original attempt + 2 failed retries + final
%% successful attempt.
%% each time should be the original batch (no duplicate
%% elements or reordering).
ExpectedSeenPayloads = lists:flatten(lists:duplicate(4, Payloads)),
Trace1 = lists:sublist(
?projection(n, ?of_kind(connector_demo_batch_inc_individual, Trace)),
length(ExpectedSeenPayloads)
),
?assertEqual(ExpectedSeenPayloads, Trace1),
?assertMatch(
[#{n := ExpectedCount}],
?of_kind(connector_demo_inc_counter, Trace)
),
?assertEqual(ExpectedCount, Counter),
%% matched should count only the original requests, and not retries
%% + 1 for `resume' call
%% + 1 for `block' call
%% + 1 for `get_counter' call
%% and the message count (1 time)
Matched4 = emqx_resource_metrics:matched_get(?ID),
?assertEqual(Matched3 + 1, Matched4),
ok
end
),
ok.
t_delete_and_re_create_with_same_name(_Config) ->
NumBufferWorkers = 2,
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 1,
worker_pool_size => NumBufferWorkers,
buffer_mode => volatile_offload,
buffer_seg_bytes => 100,
metrics_flush_interval => 50,
resume_interval => 1_000
}
),
%% pre-condition: we should have just created a new queue
Queuing0 = emqx_resource_metrics:queuing_get(?ID),
Inflight0 = emqx_resource_metrics:inflight_get(?ID),
?assertEqual(0, Queuing0),
?assertEqual(0, Inflight0),
?check_trace(
begin
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
NumRequests = 10,
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_enter_blocked}),
NumBufferWorkers,
_Timeout = 5_000
),
%% ensure replayq offloads to disk
Payload = binary:copy(<<"a">>, 119),
lists:foreach(
fun(N) ->
spawn_link(fun() ->
{error, _} =
emqx_resource:query(
?ID,
{big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>}
)
end)
end,
lists:seq(1, NumRequests)
),
{ok, _} = snabbkaffe:receive_events(SRef),
%% ensure that stuff got enqueued into disk
tap_metrics(?LINE),
?retry(
_Sleep = 300,
_Attempts0 = 20,
?assert(emqx_resource_metrics:queuing_get(?ID) > 0)
),
?retry(
_Sleep = 300,
_Attempts0 = 20,
?assertEqual(2, emqx_resource_metrics:inflight_get(?ID))
),
%% now, we delete the resource
process_flag(trap_exit, true),
ok = emqx_resource:remove_local(?ID),
?assertEqual({error, not_found}, emqx_resource_manager:lookup(?ID)),
%% re-create the resource with the *same name*
{{ok, _}, {ok, _Events}} =
?wait_async_action(
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
worker_pool_size => 2,
buffer_seg_bytes => 100,
resume_interval => 1_000
}
),
#{?snk_kind := buffer_worker_enter_running},
5_000
),
%% it shouldn't have anything enqueued, as it's a fresh resource
Queuing2 = emqx_resource_metrics:queuing_get(?ID),
Inflight2 = emqx_resource_metrics:queuing_get(?ID),
?assertEqual(0, Queuing2),
?assertEqual(0, Inflight2),
ok
end,
[]
),
ok.
%% check that, if we configure a max queue size too small, then we
%% never send requests and always overflow.
t_always_overflow(_Config) ->
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 1,
worker_pool_size => 1,
max_buffer_bytes => 1,
metrics_flush_interval => 50,
resume_interval => 1_000
}
),
?check_trace(
begin
Payload = binary:copy(<<"a">>, 100),
%% since it's sync and it should never send a request, this
%% errors with `timeout'.
?assertEqual(
{error, buffer_overflow},
emqx_resource:query(
?ID,
{big_payload, Payload},
#{timeout => 500}
)
),
ok
end,
fun(Trace) ->
?assertEqual([], ?of_kind(call_query_enter, Trace)),
ok
end
),
ok.
t_retry_sync_inflight(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 1,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => ResumeInterval
}
),
QueryOpts = #{},
?check_trace(
begin
%% now really make the resource go into `blocked' state.
%% this results in a retriable error when sync.
ok = emqx_resource:simple_sync_query(?ID, block),
TestPid = self(),
{_, {ok, _}} =
?wait_async_action(
spawn_link(fun() ->
Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
TestPid ! {res, Res}
end),
#{?snk_kind := buffer_worker_retry_inflight_failed},
ResumeInterval * 2
),
{ok, {ok, _}} =
?wait_async_action(
ok = emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_retry_inflight_succeeded},
ResumeInterval * 3
),
receive
{res, Res} ->
?assertEqual(ok, Res)
after 5_000 ->
ct:fail("no response")
end,
ok
end,
[fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1]
),
ok.
t_retry_sync_inflight_batch(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 2,
batch_time => 200,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => ResumeInterval
}
),
QueryOpts = #{},
?check_trace(
begin
%% make the resource go into `blocked' state. this
%% results in a retriable error when sync.
ok = emqx_resource:simple_sync_query(?ID, block),
process_flag(trap_exit, true),
TestPid = self(),
{_, {ok, _}} =
?wait_async_action(
spawn_link(fun() ->
Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
TestPid ! {res, Res}
end),
#{?snk_kind := buffer_worker_retry_inflight_failed},
ResumeInterval * 2
),
{ok, {ok, _}} =
?wait_async_action(
ok = emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_retry_inflight_succeeded},
ResumeInterval * 3
),
receive
{res, Res} ->
?assertEqual(ok, Res)
after 5_000 ->
ct:fail("no response")
end,
ok
end,
[fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1]
),
ok.
t_retry_async_inflight(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => ResumeInterval
}
),
QueryOpts = #{},
?check_trace(
begin
%% block
ok = emqx_resource:simple_sync_query(?ID, block),
%% then send an async request; that should be retriable.
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
#{?snk_kind := buffer_worker_retry_inflight_failed},
ResumeInterval * 2
),
%% will reply with success after the resource is healed
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_enter_running},
ResumeInterval * 2
),
ok
end,
[fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1]
),
ok.
t_retry_async_inflight_full(_Config) ->
ResumeInterval = 1_000,
AsyncInflightWindow = 5,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => ?FUNCTION_NAME},
#{
query_mode => async,
inflight_window => AsyncInflightWindow,
batch_size => 1,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => ResumeInterval
}
),
?check_trace(
#{timetrap => 15_000},
begin
%% block
ok = emqx_resource:simple_sync_query(?ID, block),
{ok, {ok, _}} =
?wait_async_action(
inc_counter_in_parallel(
AsyncInflightWindow * 2,
fun() ->
For = (ResumeInterval div 4) + rand:uniform(ResumeInterval div 4),
{sleep_before_reply, For}
end,
#{async_reply_fun => {fun(Res) -> ct:pal("Res = ~p", [Res]) end, []}}
),
#{?snk_kind := buffer_worker_flush_but_inflight_full},
ResumeInterval * 2
),
%% will reply with success after the resource is healed
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_enter_running}
),
ok
end,
[
fun(Trace) ->
?assertMatch([#{} | _], ?of_kind(buffer_worker_flush_but_inflight_full, Trace))
end
]
),
?retry(
_Sleep = 300,
_Attempts0 = 20,
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID))
),
ok.
%% this test case is to ensure the buffer worker will not go crazy even
%% if the underlying connector is misbehaving: evaluate async callbacks multiple times
t_async_reply_multi_eval(_Config) ->
ResumeInterval = 5,
TotalTime = 5_000,
AsyncInflightWindow = 3,
TotalQueries = AsyncInflightWindow * 5,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => ?FUNCTION_NAME},
#{
query_mode => async,
inflight_window => AsyncInflightWindow,
batch_size => 3,
batch_time => 10,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => ResumeInterval
}
),
%% block
ok = emqx_resource:simple_sync_query(?ID, block),
inc_counter_in_parallel(
TotalQueries,
fun() ->
Rand = rand:uniform(1000),
{random_reply, Rand}
end,
#{}
),
?retry(
2 * ResumeInterval,
TotalTime div ResumeInterval,
begin
Metrics = tap_metrics(?LINE),
#{
counters := Counters,
gauges := #{queuing := 0, inflight := 0}
} = Metrics,
#{
matched := Matched,
success := Success,
dropped := Dropped,
late_reply := LateReply,
failed := Failed
} = Counters,
?assertEqual(TotalQueries, Matched - 1),
?assertEqual(Matched, Success + Dropped + LateReply + Failed, #{counters => Counters})
end
).
t_retry_async_inflight_batch(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 2,
batch_time => 200,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => ResumeInterval
}
),
QueryOpts = #{},
?check_trace(
begin
%% block
ok = emqx_resource:simple_sync_query(?ID, block),
%% then send an async request; that should be retriable.
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
#{?snk_kind := buffer_worker_retry_inflight_failed},
ResumeInterval * 2
),
%% will reply with success after the resource is healed
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_enter_running},
ResumeInterval * 2
),
ok
end,
[fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1]
),
ok.
%% check that we monitor async worker pids and abort their inflight
%% requests if they die.
t_async_pool_worker_death(_Config) ->
ResumeInterval = 1_000,
NumBufferWorkers = 2,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
worker_pool_size => NumBufferWorkers,
metrics_refresh_interval => 50,
resume_interval => ResumeInterval
}
),
Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
Insert0 = fun(Tab, Ref, Result) ->
ct:pal("inserting ~p", [{Ref, Result}]),
ets:insert(Tab, {Ref, Result})
end,
ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
?check_trace(
begin
ok = emqx_resource:simple_sync_query(?ID, block),
NumReqs = 10,
{ok, SRef0} =
snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_appended_to_inflight}),
NumReqs,
1_000
),
inc_counter_in_parallel_increasing(NumReqs, 1, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef0),
?retry(
_Sleep = 300,
_Attempts0 = 20,
?assertEqual(NumReqs, emqx_resource_metrics:inflight_get(?ID))
),
%% grab one of the worker pids and kill it
{ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state),
MRef = monitor(process, Pid0),
ct:pal("will kill ~p", [Pid0]),
exit(Pid0, kill),
receive
{'DOWN', MRef, process, Pid0, killed} ->
ct:pal("~p killed", [Pid0]),
ok
after 200 ->
ct:fail("worker should have died")
end,
%% inflight requests should have been marked as retriable
wait_until_all_marked_as_retriable(NumReqs),
Inflight1 = emqx_resource_metrics:inflight_get(?ID),
?assertEqual(NumReqs, Inflight1),
NumReqs
end,
fun(NumReqs, Trace) ->
Events = ?of_kind(buffer_worker_async_agent_down, Trace),
%% At least one buffer worker should have marked its
%% requests as retriable. If a single one has
%% received all requests, that's all we got.
?assertMatch([_ | _], Events),
%% All requests distributed over all buffer workers
%% should have been marked as retriable, by the time
%% the inflight has been drained.
?assertEqual(
NumReqs,
lists:sum([N || #{num_affected := N} <- Events])
),
%% The `DOWN' signal must trigger the transition to the `blocked' state,
%% otherwise the request won't be retried until the buffer worker is `blocked'
%% for other reasons.
?assert(
?strict_causality(
#{?snk_kind := buffer_worker_async_agent_down, buffer_worker := _Pid0},
#{?snk_kind := buffer_worker_enter_blocked, buffer_worker := _Pid1},
_Pid0 =:= _Pid1,
Trace
)
),
ok
end
),
ok.
t_expiration_sync_before_sending(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 1,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => 1_000
}
),
do_t_expiration_before_sending(sync).
t_expiration_sync_batch_before_sending(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => 1_000
}
),
do_t_expiration_before_sending(sync).
t_expiration_async_before_sending(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => 1_000
}
),
do_t_expiration_before_sending(async).
t_expiration_async_batch_before_sending(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => 1_000
}
),
do_t_expiration_before_sending(async).
do_t_expiration_before_sending(QueryMode) ->
?check_trace(
begin
ok = emqx_resource:simple_sync_query(?ID, block),
?force_ordering(
#{?snk_kind := buffer_worker_flush_before_pop},
#{?snk_kind := delay_enter}
),
?force_ordering(
#{?snk_kind := delay},
#{?snk_kind := buffer_worker_flush_before_sieve_expired}
),
TimeoutMS = 100,
spawn_link(fun() ->
case QueryMode of
sync ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => TimeoutMS})
);
async ->
?assertEqual(
ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => TimeoutMS})
)
end
end),
spawn_link(fun() ->
?tp(delay_enter, #{}),
ct:sleep(2 * TimeoutMS),
?tp(delay, #{}),
ok
end),
{ok, _} = ?block_until(#{?snk_kind := buffer_worker_flush_all_expired}, 4 * TimeoutMS),
ok
end,
fun(Trace) ->
?assertMatch(
[#{batch := [{query, _, {inc_counter, 99}, _, _}]}],
?of_kind(buffer_worker_flush_all_expired, Trace)
),
Metrics = tap_metrics(?LINE),
?assertMatch(
#{
counters := #{
matched := 2,
%% the block call
success := 1,
dropped := 1,
'dropped.expired' := 1,
retried := 0,
failed := 0
}
},
Metrics
),
ok
end
),
ok.
t_expiration_sync_before_sending_partial_batch(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
metrics_flush_interval => 250,
resume_interval => 1_000
}
),
install_telemetry_handler(?FUNCTION_NAME),
do_t_expiration_before_sending_partial_batch(sync).
t_expiration_async_before_sending_partial_batch(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
metrics_flush_interval => 250,
resume_interval => 1_000
}
),
install_telemetry_handler(?FUNCTION_NAME),
do_t_expiration_before_sending_partial_batch(async).
do_t_expiration_before_sending_partial_batch(QueryMode) ->
?check_trace(
begin
ok = emqx_resource:simple_sync_query(?ID, block),
?force_ordering(
#{?snk_kind := buffer_worker_flush_before_pop},
#{?snk_kind := delay_enter}
),
?force_ordering(
#{?snk_kind := delay},
#{?snk_kind := buffer_worker_flush_before_sieve_expired}
),
Pid0 =
spawn_link(fun() ->
?assertEqual(
ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity})
),
?tp(infinity_query_returned, #{})
end),
TimeoutMS = 100,
Pid1 =
spawn_link(fun() ->
case QueryMode of
sync ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
);
async ->
?assertEqual(
ok,
emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
)
end
end),
Pid2 =
spawn_link(fun() ->
?tp(delay_enter, #{}),
ct:sleep(2 * TimeoutMS),
?tp(delay, #{}),
ok
end),
{ok, _} = ?block_until(
#{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS
),
ok = emqx_resource:simple_sync_query(?ID, resume),
case QueryMode of
async ->
{ok, _} = ?block_until(
#{
?snk_kind := handle_async_reply,
action := ack,
batch_or_query := [{query, _, {inc_counter, 99}, _, _}]
},
10 * TimeoutMS
);
sync ->
%% more time because it needs to retry if sync
{ok, _} = ?block_until(#{?snk_kind := infinity_query_returned}, 20 * TimeoutMS)
end,
lists:foreach(
fun(Pid) ->
unlink(Pid),
exit(Pid, kill)
end,
[Pid0, Pid1, Pid2]
),
ok
end,
fun(Trace) ->
?assertMatch(
[
#{
expired := [{query, _, {inc_counter, 199}, _, _}],
not_expired := [{query, _, {inc_counter, 99}, _, _}]
}
],
?of_kind(buffer_worker_flush_potentially_partial, Trace)
),
wait_until_gauge_is(
inflight,
#{
expected_value => 0,
timeout => 500,
max_events => 10
}
),
Metrics = tap_metrics(?LINE),
case QueryMode of
async ->
?assertMatch(
#{
counters := #{
matched := 4,
%% the block call, the request with
%% infinity timeout, and the resume
%% call.
success := 3,
dropped := 1,
'dropped.expired' := 1,
%% was sent successfully and held by
%% the test connector.
retried := 0,
failed := 0
}
},
Metrics
);
sync ->
?assertMatch(
#{
counters := #{
matched := 4,
%% the block call, the request with
%% infinity timeout, and the resume
%% call.
success := 3,
dropped := 1,
'dropped.expired' := 1,
%% currently, the test connector
%% replies with an error that may make
%% the buffer worker retry.
retried := Retried,
failed := 0
}
} when Retried =< 1,
Metrics
)
end,
ok
end
),
ok.
t_expiration_async_after_reply(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
worker_pool_size => 1,
resume_interval => 1_000
}
),
install_telemetry_handler(?FUNCTION_NAME),
do_t_expiration_async_after_reply(single).
t_expiration_async_batch_after_reply(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 3,
batch_time => 100,
worker_pool_size => 1,
resume_interval => 2_000
}
),
install_telemetry_handler(?FUNCTION_NAME),
do_t_expiration_async_after_reply(batch).
do_t_expiration_async_after_reply(IsBatch) ->
?check_trace(
begin
NAcks =
case IsBatch of
batch -> 1;
single -> 3
end,
?force_ordering(
#{?snk_kind := buffer_worker_flush_ack},
NAcks,
#{?snk_kind := delay_enter},
_Guard = true
),
?force_ordering(
#{?snk_kind := delay},
#{
?snk_kind := handle_async_reply_enter,
batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _]
}
),
TimeoutMS = 100,
?assertEqual(
ok,
emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
),
?assertEqual(
ok,
emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS})
),
?assertEqual(
ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity})
),
Pid0 =
spawn_link(fun() ->
?tp(delay_enter, #{}),
ct:sleep(2 * TimeoutMS),
?tp(delay, #{}),
ok
end),
{ok, _} = ?block_until(
#{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS
),
{ok, _} = ?block_until(
#{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS
),
wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}),
unlink(Pid0),
exit(Pid0, kill),
ok
end,
fun(Trace) ->
case IsBatch of
batch ->
?assertMatch(
[
#{
expired := [
{query, _, {inc_counter, 199}, _, _},
{query, _, {inc_counter, 299}, _, _}
]
}
],
?of_kind(handle_async_reply_expired, Trace)
),
?assertMatch(
[
#{
inflight_count := 1,
num_inflight_messages := 1
}
],
?of_kind(handle_async_reply_partially_expired, Trace)
);
single ->
?assertMatch(
[
#{expired := [{query, _, {inc_counter, 199}, _, _}]},
#{expired := [{query, _, {inc_counter, 299}, _, _}]}
],
?of_kind(handle_async_reply_expired, Trace)
)
end,
Metrics = tap_metrics(?LINE),
?assertMatch(
#{
counters := #{
matched := 3,
%% the request with infinity timeout.
success := 1,
dropped := 0,
late_reply := 2,
retried := 0,
failed := 0
}
},
Metrics
),
ok
end
),
ok.
t_expiration_batch_all_expired_after_reply(_Config) ->
ResumeInterval = 300,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 3,
batch_time => 100,
worker_pool_size => 1,
resume_interval => ResumeInterval
}
),
?check_trace(
begin
?force_ordering(
#{?snk_kind := buffer_worker_flush_ack},
#{?snk_kind := delay_enter}
),
?force_ordering(
#{?snk_kind := delay},
#{
?snk_kind := handle_async_reply_enter,
batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _]
}
),
TimeoutMS = 200,
?assertEqual(
ok,
emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
),
?assertEqual(
ok,
emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS})
),
Pid0 =
spawn_link(fun() ->
?tp(delay_enter, #{}),
ct:sleep(2 * TimeoutMS),
?tp(delay, #{}),
ok
end),
{ok, _} = ?block_until(
#{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS
),
unlink(Pid0),
exit(Pid0, kill),
ok
end,
fun(Trace) ->
?assertMatch(
[
#{
expired := [
{query, _, {inc_counter, 199}, _, _},
{query, _, {inc_counter, 299}, _, _}
]
}
],
?of_kind(handle_async_reply_expired, Trace)
),
Metrics = tap_metrics(?LINE),
?assertMatch(
#{
counters := #{
matched := 2,
success := 0,
dropped := 0,
late_reply := 2,
retried := 0,
failed := 0
},
gauges := #{
inflight := 0,
queuing := 0
}
},
Metrics
),
ok
end
),
ok.
t_expiration_retry(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 1,
worker_pool_size => 1,
resume_interval => 300
}
),
do_t_expiration_retry().
t_expiration_retry_batch(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
resume_interval => 300
}
),
do_t_expiration_retry().
do_t_expiration_retry() ->
ResumeInterval = 300,
?check_trace(
begin
ok = emqx_resource:simple_sync_query(?ID, block),
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_flush_nack}),
1,
200
),
TimeoutMS = 100,
%% the request that expires must be first, so it's the
%% head of the inflight table (and retriable).
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_appended_to_queue}),
1,
ResumeInterval * 2
),
spawn_link(fun() ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
emqx_resource:query(
?ID,
{inc_counter, 1},
#{timeout => TimeoutMS}
)
)
end),
Pid1 =
spawn_link(fun() ->
receive
go -> ok
end,
?assertEqual(
ok,
emqx_resource:query(
?ID,
{inc_counter, 2},
#{timeout => infinity}
)
)
end),
{ok, _} = snabbkaffe:receive_events(SRef1),
Pid1 ! go,
{ok, _} = snabbkaffe:receive_events(SRef0),
{ok, _} =
?block_until(
#{?snk_kind := buffer_worker_retry_expired},
ResumeInterval * 10
),
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_retry_inflight_succeeded},
ResumeInterval * 5
),
ok
end,
fun(Trace) ->
?assertMatch(
[#{expired := [{query, _, {inc_counter, 1}, _, _}]}],
?of_kind(buffer_worker_retry_expired, Trace)
),
Metrics = tap_metrics(?LINE),
?assertMatch(
#{
gauges := #{
inflight := 0,
queuing := 0
}
},
Metrics
),
ok
end
),
ok.
t_expiration_retry_batch_multiple_times(_Config) ->
ResumeInterval = 300,
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
resume_interval => ResumeInterval
}
),
?check_trace(
begin
ok = emqx_resource:simple_sync_query(?ID, block),
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_flush_nack}),
1,
200
),
TimeoutMS = 100,
spawn_link(fun() ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
emqx_resource:query(
?ID,
{inc_counter, 1},
#{timeout => TimeoutMS}
)
)
end),
spawn_link(fun() ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
emqx_resource:query(
?ID,
{inc_counter, 2},
#{timeout => ResumeInterval + TimeoutMS}
)
)
end),
{ok, _} = snabbkaffe:receive_events(SRef),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := buffer_worker_retry_expired}),
ResumeInterval * 10
),
ok
end,
fun(Trace) ->
?assertMatch(
[
#{expired := [{query, _, {inc_counter, 1}, _, _}]},
#{expired := [{query, _, {inc_counter, 2}, _, _}]}
],
?of_kind(buffer_worker_retry_expired, Trace)
),
ok
end
),
ok.
t_batch_individual_reply_sync(_Config) ->
ResumeInterval = 300,
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 5,
batch_time => 100,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => ResumeInterval
}
),
do_t_batch_individual_reply().
t_batch_individual_reply_async(_Config) ->
ResumeInterval = 300,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 5,
batch_time => 100,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => ResumeInterval
}
),
on_exit(fun() -> emqx_resource:remove_local(?ID) end),
do_t_batch_individual_reply().
do_t_batch_individual_reply() ->
?check_trace(
begin
{Results, {ok, _}} =
?wait_async_action(
emqx_utils:pmap(
fun(N) ->
emqx_resource:query(?ID, {individual_reply, N rem 2 =:= 0})
end,
lists:seq(1, 5)
),
#{?snk_kind := buffer_worker_flush_ack, batch_or_query := [_, _ | _]},
5_000
),
Ok = ok,
Error = {error, {unrecoverable_error, bad_request}},
?assertEqual([Error, Ok, Error, Ok, Error], Results),
?retry(
200,
10,
?assertMatch(
#{
counters := #{
matched := 5,
failed := 3,
success := 2
}
},
tap_metrics(?LINE)
)
),
ok
end,
[]
),
ok.
t_recursive_flush(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
worker_pool_size => 1
}
),
do_t_recursive_flush().
t_recursive_flush_batch(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 2,
batch_time => 10_000,
worker_pool_size => 1
}
),
do_t_recursive_flush().
do_t_recursive_flush() ->
?check_trace(
begin
Timeout = 1_000,
Pid = spawn_link(fun S() ->
emqx_resource:query(?ID, {inc_counter, 1}),
S()
end),
%% we want two reflushes to happen before we analyze the
%% trace, so that we get a single full interaction
{ok, _} = snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := buffer_worker_flush_ack_reflush}), Timeout
),
unlink(Pid),
exit(Pid, kill),
ok
end,
fun(Trace) ->
%% check that a recursive flush leads to a new call to flush/1
Pairs = ?find_pairs(
#{?snk_kind := buffer_worker_flush_ack_reflush},
#{?snk_kind := buffer_worker_flush},
Trace
),
?assert(lists:any(fun(E) -> E end, [true || {pair, _, _} <- Pairs]))
end
),
ok.
t_call_mode_uncoupled_from_query_mode(_Config) ->
DefaultOpts = #{
batch_size => 1,
batch_time => 5,
worker_pool_size => 1
},
?check_trace(
begin
%% We check that we can call the buffer workers with async
%% calls, even if the underlying connector itself only
%% supports sync calls.
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
DefaultOpts#{query_mode => async}
),
?tp_span(
async_query_sync_driver,
#{},
?assertMatch(
{ok, {ok, _}},
?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 1}),
#{?snk_kind := buffer_worker_flush_ack},
500
)
)
),
?assertEqual(ok, emqx_resource:remove_local(?ID)),
%% And we check the converse: a connector that allows async
%% calls can be called synchronously, but the underlying
%% call should be async.
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
DefaultOpts#{query_mode => sync}
),
?tp_span(
sync_query_async_driver,
#{},
?assertEqual(ok, emqx_resource:query(?ID, {inc_counter, 2}))
),
?assertEqual(ok, emqx_resource:remove_local(?ID)),
?tp(sync_query_async_driver, #{}),
ok
end,
fun(Trace0) ->
Trace1 = trace_between_span(Trace0, async_query_sync_driver),
ct:pal("async query calling sync driver\n ~p", [Trace1]),
?assert(
?strict_causality(
#{?snk_kind := async_query, request := {inc_counter, 1}},
#{?snk_kind := call_query, call_mode := sync},
Trace1
)
),
Trace2 = trace_between_span(Trace0, sync_query_async_driver),
ct:pal("sync query calling async driver\n ~p", [Trace2]),
?assert(
?strict_causality(
#{?snk_kind := sync_query, request := {inc_counter, 2}},
#{?snk_kind := call_query_async},
Trace2
)
),
ok
end
).
%% The default mode is currently `memory_only'.
t_volatile_offload_mode(_Config) ->
MaxBufferBytes = 1_000,
DefaultOpts = #{
max_buffer_bytes => MaxBufferBytes,
worker_pool_size => 1
},
?check_trace(
begin
emqx_connector_demo:set_callback_mode(async_if_possible),
%% Create without any specified segment bytes; should
%% default to equal max bytes.
?assertMatch(
{ok, _},
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
DefaultOpts#{buffer_mode => volatile_offload}
)
),
?assertEqual(ok, emqx_resource:remove_local(?ID)),
%% Create with segment bytes < max bytes
?assertMatch(
{ok, _},
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
DefaultOpts#{
buffer_mode => volatile_offload,
buffer_seg_bytes => MaxBufferBytes div 2
}
)
),
?assertEqual(ok, emqx_resource:remove_local(?ID)),
%% Create with segment bytes = max bytes
?assertMatch(
{ok, _},
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
DefaultOpts#{
buffer_mode => volatile_offload,
buffer_seg_bytes => MaxBufferBytes
}
)
),
?assertEqual(ok, emqx_resource:remove_local(?ID)),
%% Create with segment bytes > max bytes; should normalize
%% to max bytes.
?assertMatch(
{ok, _},
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
DefaultOpts#{
buffer_mode => volatile_offload,
buffer_seg_bytes => 2 * MaxBufferBytes
}
)
),
?assertEqual(ok, emqx_resource:remove_local(?ID)),
ok
end,
fun(Trace) ->
HalfMaxBufferBytes = MaxBufferBytes div 2,
?assertMatch(
[
#{
dir := _,
max_total_bytes := MaxTotalBytes,
seg_bytes := MaxTotalBytes,
offload := {true, volatile}
},
#{
dir := _,
max_total_bytes := MaxTotalBytes,
%% uses the specified value since it's smaller
%% than max bytes.
seg_bytes := HalfMaxBufferBytes,
offload := {true, volatile}
},
#{
dir := _,
max_total_bytes := MaxTotalBytes,
seg_bytes := MaxTotalBytes,
offload := {true, volatile}
},
#{
dir := _,
max_total_bytes := MaxTotalBytes,
seg_bytes := MaxTotalBytes,
offload := {true, volatile}
}
],
?projection(queue_opts, ?of_kind(buffer_worker_init, Trace))
),
ok
end
).
t_late_call_reply(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
RequestTTL = 500,
?assertMatch(
{ok, _},
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
buffer_mode => memory_only,
request_ttl => RequestTTL,
query_mode => sync
}
)
),
?check_trace(
begin
%% Sleep for longer than the request timeout; the call reply will
%% have been already returned (a timeout), but the resource will
%% still send a message with the reply.
%% The demo connector will reply with `{error, timeout}' after 1 s.
SleepFor = RequestTTL + 500,
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
emqx_resource:query(
?ID,
{sync_sleep_before_reply, SleepFor},
#{timeout => RequestTTL}
)
),
%% Our process shouldn't receive any late messages.
receive
LateReply ->
ct:fail("received late reply: ~p", [LateReply])
after SleepFor ->
ok
end,
ok
end,
[]
),
ok.
t_resource_create_error_activate_alarm_once(_) ->
do_t_resource_activate_alarm_once(
#{name => test_resource, create_error => true},
connector_demo_start_error
).
t_resource_health_check_error_activate_alarm_once(_) ->
do_t_resource_activate_alarm_once(
#{name => test_resource, health_check_error => true},
connector_demo_health_check_error
).
do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) ->
?check_trace(
begin
?wait_async_action(
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
ResourceConfig,
#{health_check_interval => 100}
),
#{?snk_kind := resource_activate_alarm, resource_id := ?ID}
),
?assertMatch([#{activated := true, name := ?ID}], emqx_alarm:get_alarms(activated)),
{ok, SubRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := SubscribeEvent}), 4, 7000
),
?assertMatch({ok, [_, _, _, _]}, snabbkaffe:receive_events(SubRef))
end,
fun(Trace) ->
?assertMatch([_], ?of_kind(resource_activate_alarm, Trace))
end
).
t_telemetry_handler_crash(_Config) ->
%% Check that a crash while handling a telemetry event, such as when a busy resource
%% is restarted and its metrics are not recreated while handling an increment, does
%% not lead to the handler being uninstalled.
?check_trace(
begin
NonExistentId = <<"I-dont-exist">>,
WorkerId = 1,
HandlersBefore = telemetry:list_handlers([?TELEMETRY_PREFIX]),
?assertMatch([_ | _], HandlersBefore),
lists:foreach(fun(Fn) -> Fn(NonExistentId) end, counter_metric_inc_fns()),
emqx_common_test_helpers:with_mock(
emqx_metrics_worker,
set_gauge,
fun(_Name, _Id, _WorkerId, _Metric, _Val) ->
error(random_crash)
end,
fun() ->
lists:foreach(
fun(Fn) -> Fn(NonExistentId, WorkerId, 1) end, gauge_metric_set_fns()
)
end
),
?assertEqual(HandlersBefore, telemetry:list_handlers([?TELEMETRY_PREFIX])),
ok
end,
[]
),
ok.
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
inc_counter_in_parallel(N) ->
inc_counter_in_parallel(N, {inc_counter, 1}, #{}).
inc_counter_in_parallel(N, Opts0) ->
inc_counter_in_parallel(N, {inc_counter, 1}, Opts0).
inc_counter_in_parallel(N, Query, Opts) ->
Parent = self(),
Pids = [
erlang:spawn(fun() ->
emqx_resource:query(?ID, maybe_apply(Query), maybe_apply(Opts)),
Parent ! {complete, self()}
end)
|| _ <- lists:seq(1, N)
],
[
receive
{complete, Pid} -> ok
after 1000 ->
ct:fail({wait_for_query_timeout, Pid})
end
|| Pid <- Pids
],
ok.
inc_counter_in_parallel_increasing(N, StartN, Opts) ->
Parent = self(),
Pids = [
erlang:spawn(fun() ->
emqx_resource:query(?ID, {inc_counter, M}, maybe_apply(Opts)),
Parent ! {complete, self()}
end)
|| M <- lists:seq(StartN, StartN + N - 1)
],
[
receive
{complete, Pid} -> ok
after 1000 ->
ct:fail({wait_for_query_timeout, Pid})
end
|| Pid <- Pids
].
maybe_apply(FunOrTerm) ->
maybe_apply(FunOrTerm, []).
maybe_apply(Fun, Args) when is_function(Fun) ->
erlang:apply(Fun, Args);
maybe_apply(Term, _Args) ->
Term.
bin_config() ->
<<"\"name\": \"test_resource\"">>.
config() ->
{ok, Config} = hocon:binary(bin_config()),
Config.
tap_metrics(Line) ->
#{counters := C, gauges := G} = emqx_resource:get_metrics(?ID),
ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]),
#{counters => C, gauges => G}.
install_telemetry_handler(TestCase) ->
Tid = ets:new(TestCase, [ordered_set, public]),
HandlerId = TestCase,
TestPid = self(),
_ = telemetry:attach_many(
HandlerId,
emqx_resource_metrics:events(),
fun(EventName, Measurements, Metadata, _Config) ->
Data = #{
name => EventName,
measurements => Measurements,
metadata => Metadata
},
ets:insert(Tid, {erlang:monotonic_time(), Data}),
TestPid ! {telemetry, Data},
ok
end,
unused_config
),
on_exit(fun() ->
telemetry:detach(HandlerId),
ets:delete(Tid)
end),
put({?MODULE, telemetry_table}, Tid),
Tid.
wait_until_gauge_is(
GaugeName,
#{
expected_value := ExpectedValue,
timeout := Timeout,
max_events := MaxEvents
}
) ->
Events = receive_all_events(GaugeName, Timeout, MaxEvents),
case length(Events) > 0 andalso lists:last(Events) of
#{measurements := #{gauge_set := ExpectedValue}} ->
ok;
#{measurements := #{gauge_set := Value}} ->
ct:fail(
"gauge ~p didn't reach expected value ~p; last value: ~p",
[GaugeName, ExpectedValue, Value]
);
false ->
ct:pal("no ~p gauge events received!", [GaugeName])
end.
receive_all_events(EventName, Timeout) ->
receive_all_events(EventName, Timeout, _MaxEvents = 50, _Count = 0, _Acc = []).
receive_all_events(EventName, Timeout, MaxEvents) ->
receive_all_events(EventName, Timeout, MaxEvents, _Count = 0, _Acc = []).
receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents ->
lists:reverse(Acc);
receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) ->
receive
{telemetry, #{name := [_, _, EventName]} = Event} ->
ct:pal("telemetry event: ~p", [Event]),
receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc])
after Timeout ->
lists:reverse(Acc)
end.
wait_telemetry_event(EventName) ->
wait_telemetry_event(EventName, #{timeout => 5_000, n_events => 1}).
wait_telemetry_event(
EventName,
Opts0
) ->
DefaultOpts = #{timeout => 5_000, n_events => 1},
#{timeout := Timeout, n_events := NEvents} = maps:merge(DefaultOpts, Opts0),
wait_n_events(NEvents, Timeout, EventName).
wait_n_events(NEvents, _Timeout, _EventName) when NEvents =< 0 ->
ok;
wait_n_events(NEvents, Timeout, EventName) ->
TelemetryTable = get({?MODULE, telemetry_table}),
receive
{telemetry, #{name := [_, _, EventName]}} ->
wait_n_events(NEvents - 1, Timeout, EventName)
after Timeout ->
RecordedEvents = ets:tab2list(TelemetryTable),
ct:pal("recorded events: ~p", [RecordedEvents]),
error({timeout_waiting_for_telemetry, EventName})
end.
assert_sync_retry_fail_then_succeed_inflight(Trace) ->
ct:pal(" ~p", [Trace]),
?assert(
?strict_causality(
#{?snk_kind := buffer_worker_flush_nack, ref := _Ref},
#{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
Trace
)
),
%% not strict causality because it might retry more than once
%% before restoring the resource health.
?assert(
?causality(
#{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
#{?snk_kind := buffer_worker_retry_inflight_succeeded, ref := _Ref},
Trace
)
),
ok.
assert_async_retry_fail_then_succeed_inflight(Trace) ->
ct:pal(" ~p", [Trace]),
?assert(
?strict_causality(
#{?snk_kind := handle_async_reply, action := nack},
#{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
Trace
)
),
%% not strict causality because it might retry more than once
%% before restoring the resource health.
?assert(
?causality(
#{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
#{?snk_kind := buffer_worker_retry_inflight_succeeded, ref := _Ref},
Trace
)
),
ok.
trace_between_span(Trace0, Marker) ->
{Trace1, [_ | _]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := {complete, _}}, Trace0),
{[_ | _], [_ | Trace2]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := start}, Trace1),
Trace2.
wait_until_all_marked_as_retriable(NumExpected) when NumExpected =< 0 ->
ok;
wait_until_all_marked_as_retriable(NumExpected) ->
Seen = #{},
do_wait_until_all_marked_as_retriable(NumExpected, Seen).
do_wait_until_all_marked_as_retriable(NumExpected, _Seen) when NumExpected =< 0 ->
ok;
do_wait_until_all_marked_as_retriable(NumExpected, Seen) ->
Res = ?block_until(
#{?snk_kind := buffer_worker_async_agent_down, ?snk_meta := #{pid := P}} when
not is_map_key(P, Seen),
10_000
),
case Res of
{timeout, Evts} ->
ct:pal("events so far:\n ~p", [Evts]),
ct:fail("timeout waiting for events");
{ok, #{num_affected := NumAffected, ?snk_meta := #{pid := Pid}}} ->
ct:pal("affected: ~p; pid: ~p", [NumAffected, Pid]),
case NumAffected >= NumExpected of
true ->
ok;
false ->
do_wait_until_all_marked_as_retriable(NumExpected - NumAffected, Seen#{
Pid => true
})
end
end.
counter_metric_inc_fns() ->
Mod = emqx_resource_metrics,
[
fun Mod:Fn/1
|| {Fn, 1} <- Mod:module_info(functions),
case string:find(atom_to_list(Fn), "_inc", trailing) of
"_inc" -> true;
_ -> false
end
].
gauge_metric_set_fns() ->
Mod = emqx_resource_metrics,
[
fun Mod:Fn/3
|| {Fn, 3} <- Mod:module_info(functions),
case string:find(atom_to_list(Fn), "_set", trailing) of
"_set" -> true;
_ -> false
end
].
create(Id, Group, Type, Config) ->
emqx_resource:create_local(Id, Group, Type, Config).
create(Id, Group, Type, Config, Opts) ->
emqx_resource:create_local(Id, Group, Type, Config, Opts).