1159 lines
36 KiB
Erlang
1159 lines
36 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_resource_SUITE).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include("emqx_resource.hrl").
|
|
-include_lib("stdlib/include/ms_transform.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-define(TEST_RESOURCE, emqx_connector_demo).
|
|
-define(ID, <<"id">>).
|
|
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
|
|
-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
|
|
-define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
|
|
|
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
groups() ->
|
|
[].
|
|
|
|
init_per_testcase(_, Config) ->
|
|
ct:timetrap({seconds, 30}),
|
|
emqx_connector_demo:set_callback_mode(always_sync),
|
|
Config.
|
|
|
|
end_per_testcase(_, _Config) ->
|
|
snabbkaffe:stop(),
|
|
_ = emqx_resource:remove(?ID),
|
|
emqx_common_test_helpers:call_janitor(),
|
|
ok.
|
|
|
|
init_per_suite(Config) ->
|
|
code:ensure_loaded(?TEST_RESOURCE),
|
|
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
|
{ok, _} = application:ensure_all_started(emqx_resource),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
ok = emqx_common_test_helpers:stop_apps([emqx_resource, emqx_conf]).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Tests
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_list_types(_) ->
|
|
?assert(lists:member(?TEST_RESOURCE, emqx_resource:list_types())).
|
|
|
|
t_check_config(_) ->
|
|
{ok, #{}} = emqx_resource:check_config(?TEST_RESOURCE, bin_config()),
|
|
{ok, #{}} = emqx_resource:check_config(?TEST_RESOURCE, config()),
|
|
|
|
{error, _} = emqx_resource:check_config(?TEST_RESOURCE, <<"not a config">>),
|
|
{error, _} = emqx_resource:check_config(?TEST_RESOURCE, #{invalid => config}).
|
|
|
|
t_create_remove(_) ->
|
|
{error, _} = emqx_resource:check_and_create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{unknown => test_resource}
|
|
),
|
|
|
|
{ok, _} = emqx_resource:create(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource}
|
|
),
|
|
|
|
{ok, _} = emqx_resource:recreate(
|
|
?ID,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource},
|
|
#{}
|
|
),
|
|
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
|
|
|
|
?assert(is_process_alive(Pid)),
|
|
|
|
ok = emqx_resource:remove(?ID),
|
|
{error, _} = emqx_resource:remove(?ID),
|
|
|
|
?assertNot(is_process_alive(Pid)).
|
|
|
|
t_create_remove_local(_) ->
|
|
{error, _} = emqx_resource:check_and_create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{unknown => test_resource}
|
|
),
|
|
|
|
{ok, _} = emqx_resource:create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource}
|
|
),
|
|
|
|
emqx_resource:recreate_local(
|
|
?ID,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource},
|
|
#{}
|
|
),
|
|
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
|
|
|
|
?assert(is_process_alive(Pid)),
|
|
|
|
emqx_resource:set_resource_status_connecting(?ID),
|
|
|
|
emqx_resource:recreate_local(
|
|
?ID,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource},
|
|
#{}
|
|
),
|
|
|
|
ok = emqx_resource:remove_local(?ID),
|
|
{error, _} = emqx_resource:remove_local(?ID),
|
|
|
|
?assertMatch(
|
|
?RESOURCE_ERROR(not_found),
|
|
emqx_resource:query(?ID, get_state)
|
|
),
|
|
?assertNot(is_process_alive(Pid)).
|
|
|
|
t_do_not_start_after_created(_) ->
|
|
ct:pal("creating resource"),
|
|
{ok, _} = emqx_resource:create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource},
|
|
#{start_after_created => false}
|
|
),
|
|
%% the resource should remain `disconnected` after created
|
|
timer:sleep(200),
|
|
?assertMatch(
|
|
?RESOURCE_ERROR(stopped),
|
|
emqx_resource:query(?ID, get_state)
|
|
),
|
|
?assertMatch(
|
|
{ok, _, #{status := stopped}},
|
|
emqx_resource:get_instance(?ID)
|
|
),
|
|
|
|
%% start the resource manually..
|
|
ct:pal("starting resource manually"),
|
|
ok = emqx_resource:start(?ID),
|
|
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
|
|
?assert(is_process_alive(Pid)),
|
|
|
|
%% restart the resource
|
|
ct:pal("restarting resource"),
|
|
ok = emqx_resource:restart(?ID),
|
|
?assertNot(is_process_alive(Pid)),
|
|
{ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state),
|
|
?assert(is_process_alive(Pid2)),
|
|
|
|
ct:pal("removing resource"),
|
|
ok = emqx_resource:remove_local(?ID),
|
|
|
|
?assertNot(is_process_alive(Pid2)).
|
|
|
|
t_query(_) ->
|
|
{ok, _} = emqx_resource:create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource}
|
|
),
|
|
|
|
{ok, #{pid := _}} = emqx_resource:query(?ID, get_state),
|
|
|
|
?assertMatch(
|
|
?RESOURCE_ERROR(not_found),
|
|
emqx_resource:query(<<"unknown">>, get_state)
|
|
),
|
|
|
|
ok = emqx_resource:remove_local(?ID).
|
|
|
|
t_query_counter(_) ->
|
|
{ok, _} = emqx_resource:create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource, register => true}
|
|
),
|
|
|
|
{ok, 0} = emqx_resource:query(?ID, get_counter),
|
|
ok = emqx_resource:query(?ID, {inc_counter, 1}),
|
|
{ok, 1} = emqx_resource:query(?ID, get_counter),
|
|
ok = emqx_resource:query(?ID, {inc_counter, 5}),
|
|
{ok, 6} = emqx_resource:query(?ID, get_counter),
|
|
|
|
ok = emqx_resource:remove_local(?ID).
|
|
|
|
t_batch_query_counter(_) ->
|
|
BatchSize = 100,
|
|
{ok, _} = emqx_resource:create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource, register => true},
|
|
#{batch_size => BatchSize, query_mode => sync}
|
|
),
|
|
|
|
?check_trace(
|
|
?TRACE_OPTS,
|
|
emqx_resource:query(?ID, get_counter),
|
|
fun(Result, Trace) ->
|
|
?assertMatch({ok, 0}, Result),
|
|
QueryTrace = ?of_kind(call_batch_query, Trace),
|
|
?assertMatch([#{batch := [{query, _, get_counter, _}]}], QueryTrace)
|
|
end
|
|
),
|
|
|
|
NMsgs = 1_000,
|
|
?check_trace(
|
|
?TRACE_OPTS,
|
|
begin
|
|
NEvents = round(math:ceil(NMsgs / BatchSize)),
|
|
{ok, SRef} = snabbkaffe:subscribe(
|
|
?match_event(#{?snk_kind := connector_demo_inc_counter}),
|
|
NEvents,
|
|
_Timeout = 10_000
|
|
),
|
|
inc_counter_in_parallel(NMsgs),
|
|
{ok, _} = snabbkaffe:receive_events(SRef),
|
|
ok
|
|
end,
|
|
fun(Trace) ->
|
|
QueryTrace = ?of_kind(call_batch_query, Trace),
|
|
?assertMatch([#{batch := BatchReq} | _] when length(BatchReq) > 1, QueryTrace)
|
|
end
|
|
),
|
|
{ok, NMsgs} = emqx_resource:query(?ID, get_counter),
|
|
|
|
ok = emqx_resource:remove_local(?ID).
|
|
|
|
t_query_counter_async_query(_) ->
|
|
{ok, _} = emqx_resource:create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource, register => true},
|
|
#{query_mode => async, batch_size => 1}
|
|
),
|
|
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
|
|
NMsgs = 1_000,
|
|
?check_trace(
|
|
?TRACE_OPTS,
|
|
begin
|
|
{ok, SRef} = snabbkaffe:subscribe(
|
|
?match_event(#{?snk_kind := connector_demo_inc_counter}),
|
|
NMsgs,
|
|
_Timeout = 60_000
|
|
),
|
|
inc_counter_in_parallel(NMsgs),
|
|
{ok, _} = snabbkaffe:receive_events(SRef),
|
|
ok
|
|
end,
|
|
fun(Trace) ->
|
|
%% the callback_mode of 'emqx_connector_demo' is 'always_sync'.
|
|
QueryTrace = ?of_kind(call_query, Trace),
|
|
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
|
|
end
|
|
),
|
|
%% simple query ignores the query_mode and batching settings in the resource_worker
|
|
?check_trace(
|
|
?TRACE_OPTS,
|
|
emqx_resource:simple_sync_query(?ID, get_counter),
|
|
fun(Result, Trace) ->
|
|
?assertMatch({ok, 1000}, Result),
|
|
%% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
|
|
QueryTrace = ?of_kind(call_query, Trace),
|
|
?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace)
|
|
end
|
|
),
|
|
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
|
|
?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C),
|
|
ok = emqx_resource:remove_local(?ID).
|
|
|
|
t_query_counter_async_callback(_) ->
|
|
emqx_connector_demo:set_callback_mode(async_if_possible),
|
|
|
|
Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
|
|
Insert = fun(Tab, Result) ->
|
|
ets:insert(Tab, {make_ref(), Result})
|
|
end,
|
|
ReqOpts = #{async_reply_fun => {Insert, [Tab0]}},
|
|
{ok, _} = emqx_resource:create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource, register => true},
|
|
#{
|
|
query_mode => async,
|
|
batch_size => 1,
|
|
async_inflight_window => 1000000
|
|
}
|
|
),
|
|
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
|
|
NMsgs = 1_000,
|
|
?check_trace(
|
|
?TRACE_OPTS,
|
|
begin
|
|
{ok, SRef} = snabbkaffe:subscribe(
|
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
|
NMsgs,
|
|
_Timeout = 60_000
|
|
),
|
|
inc_counter_in_parallel(NMsgs, ReqOpts),
|
|
{ok, _} = snabbkaffe:receive_events(SRef),
|
|
ok
|
|
end,
|
|
fun(Trace) ->
|
|
QueryTrace = ?of_kind(call_query_async, Trace),
|
|
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
|
|
end
|
|
),
|
|
|
|
%% simple query ignores the query_mode and batching settings in the resource_worker
|
|
?check_trace(
|
|
?TRACE_OPTS,
|
|
emqx_resource:simple_sync_query(?ID, get_counter),
|
|
fun(Result, Trace) ->
|
|
?assertMatch({ok, 1000}, Result),
|
|
QueryTrace = ?of_kind(call_query, Trace),
|
|
?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace)
|
|
end
|
|
),
|
|
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
|
|
?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C),
|
|
?assertMatch(1000, ets:info(Tab0, size)),
|
|
?assert(
|
|
lists:all(
|
|
fun
|
|
({_, ok}) -> true;
|
|
(_) -> false
|
|
end,
|
|
ets:tab2list(Tab0)
|
|
)
|
|
),
|
|
ok = emqx_resource:remove_local(?ID).
|
|
|
|
t_query_counter_async_inflight(_) ->
|
|
emqx_connector_demo:set_callback_mode(async_if_possible),
|
|
MetricsTab = ets:new(metrics_tab, [ordered_set, public]),
|
|
ok = telemetry:attach_many(
|
|
?FUNCTION_NAME,
|
|
emqx_resource_metrics:events(),
|
|
fun(Event, Measurements, Meta, _Config) ->
|
|
ets:insert(
|
|
MetricsTab,
|
|
{erlang:monotonic_time(), #{
|
|
event => Event, measurements => Measurements, metadata => Meta
|
|
}}
|
|
),
|
|
ok
|
|
end,
|
|
unused_config
|
|
),
|
|
on_exit(fun() -> telemetry:detach(?FUNCTION_NAME) end),
|
|
|
|
Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
|
|
Insert0 = fun(Tab, Ref, Result) ->
|
|
ct:pal("inserting ~p", [{Ref, Result}]),
|
|
ets:insert(Tab, {Ref, Result})
|
|
end,
|
|
ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
|
|
WindowSize = 15,
|
|
{ok, _} = emqx_resource:create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource, register => true},
|
|
#{
|
|
query_mode => async,
|
|
batch_size => 1,
|
|
async_inflight_window => WindowSize,
|
|
worker_pool_size => 1,
|
|
resume_interval => 300
|
|
}
|
|
),
|
|
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
|
|
|
|
%% block the resource
|
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
|
|
|
|
%% send async query to make the inflight window full
|
|
?check_trace(
|
|
?TRACE_OPTS,
|
|
inc_counter_in_parallel(WindowSize, ReqOpts),
|
|
fun(Trace) ->
|
|
QueryTrace = ?of_kind(call_query_async, Trace),
|
|
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
|
|
end
|
|
),
|
|
tap_metrics(?LINE),
|
|
|
|
%% this will block the resource_worker as the inflight window is full now
|
|
{ok, {ok, _}} =
|
|
?wait_async_action(
|
|
emqx_resource:query(?ID, {inc_counter, 2}),
|
|
#{?snk_kind := resource_worker_enter_blocked},
|
|
1_000
|
|
),
|
|
?assertMatch(0, ets:info(Tab0, size)),
|
|
|
|
tap_metrics(?LINE),
|
|
%% send query now will fail because the resource is blocked.
|
|
Insert = fun(Tab, Ref, Result) ->
|
|
ct:pal("inserting ~p", [{Ref, Result}]),
|
|
ets:insert(Tab, {Ref, Result}),
|
|
?tp(tmp_query_inserted, #{})
|
|
end,
|
|
{ok, {ok, _}} =
|
|
?wait_async_action(
|
|
emqx_resource:query(?ID, {inc_counter, 3}, #{
|
|
async_reply_fun => {Insert, [Tab0, tmp_query]}
|
|
}),
|
|
#{?snk_kind := tmp_query_inserted},
|
|
1_000
|
|
),
|
|
%% since this counts as a failure, it'll be enqueued and retried
|
|
%% later, when the resource is unblocked.
|
|
?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)),
|
|
tap_metrics(?LINE),
|
|
|
|
%% all responses should be received after the resource is resumed.
|
|
{ok, SRef0} = snabbkaffe:subscribe(
|
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
|
%% +1 because the tmp_query above will be retried and succeed
|
|
%% this time.
|
|
WindowSize + 1,
|
|
_Timeout = 60_000
|
|
),
|
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
|
tap_metrics(?LINE),
|
|
{ok, _} = snabbkaffe:receive_events(SRef0),
|
|
%% since the previous tmp_query was enqueued to be retried, we
|
|
%% take it again from the table; this time, it should have
|
|
%% succeeded.
|
|
?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
|
|
?assertEqual(WindowSize, ets:info(Tab0, size)),
|
|
tap_metrics(?LINE),
|
|
|
|
%% send async query, this time everything should be ok.
|
|
Num = 10,
|
|
?check_trace(
|
|
?TRACE_OPTS,
|
|
begin
|
|
{ok, SRef} = snabbkaffe:subscribe(
|
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
|
Num,
|
|
_Timeout = 60_000
|
|
),
|
|
inc_counter_in_parallel(Num, ReqOpts),
|
|
{ok, _} = snabbkaffe:receive_events(SRef),
|
|
ok
|
|
end,
|
|
fun(Trace) ->
|
|
QueryTrace = ?of_kind(call_query_async, Trace),
|
|
?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace)
|
|
end
|
|
),
|
|
?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
|
tap_metrics(?LINE),
|
|
|
|
%% block the resource
|
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
|
|
%% again, send async query to make the inflight window full
|
|
?check_trace(
|
|
?TRACE_OPTS,
|
|
inc_counter_in_parallel(WindowSize, ReqOpts),
|
|
fun(Trace) ->
|
|
QueryTrace = ?of_kind(call_query_async, Trace),
|
|
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
|
|
end
|
|
),
|
|
|
|
%% this will block the resource_worker
|
|
ok = emqx_resource:query(?ID, {inc_counter, 1}),
|
|
|
|
Sent = WindowSize + Num + WindowSize,
|
|
{ok, SRef1} = snabbkaffe:subscribe(
|
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
|
WindowSize,
|
|
_Timeout = 60_000
|
|
),
|
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
|
{ok, _} = snabbkaffe:receive_events(SRef1),
|
|
?assertEqual(Sent, ets:info(Tab0, size)),
|
|
|
|
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
|
|
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
|
|
?assert(Sent =< Counter),
|
|
|
|
%% give the metrics some time to stabilize.
|
|
ct:sleep(1000),
|
|
#{counters := C, gauges := G} = tap_metrics(?LINE),
|
|
?assertMatch(
|
|
#{
|
|
counters :=
|
|
#{matched := M, success := Ss, dropped := Dp},
|
|
gauges := #{queuing := Qing, inflight := Infl}
|
|
} when
|
|
M == Ss + Dp + Qing + Infl,
|
|
#{counters => C, gauges => G},
|
|
#{
|
|
metrics => #{counters => C, gauges => G},
|
|
results => ets:tab2list(Tab0),
|
|
metrics_trace => ets:tab2list(MetricsTab)
|
|
}
|
|
),
|
|
?assert(
|
|
lists:all(
|
|
fun
|
|
({_, ok}) -> true;
|
|
(_) -> false
|
|
end,
|
|
ets:tab2list(Tab0)
|
|
)
|
|
),
|
|
ok = emqx_resource:remove_local(?ID).
|
|
|
|
t_query_counter_async_inflight_batch(_) ->
|
|
emqx_connector_demo:set_callback_mode(async_if_possible),
|
|
MetricsTab = ets:new(metrics_tab, [ordered_set, public]),
|
|
ok = telemetry:attach_many(
|
|
?FUNCTION_NAME,
|
|
emqx_resource_metrics:events(),
|
|
fun(Event, Measurements, Meta, _Config) ->
|
|
ets:insert(
|
|
MetricsTab,
|
|
{erlang:monotonic_time(), #{
|
|
event => Event, measurements => Measurements, metadata => Meta
|
|
}}
|
|
),
|
|
ok
|
|
end,
|
|
unused_config
|
|
),
|
|
on_exit(fun() -> telemetry:detach(?FUNCTION_NAME) end),
|
|
|
|
Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
|
|
Insert0 = fun(Tab, Ref, Result) ->
|
|
ct:pal("inserting ~p", [{Ref, Result}]),
|
|
ets:insert(Tab, {Ref, Result})
|
|
end,
|
|
ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
|
|
BatchSize = 2,
|
|
WindowSize = 3,
|
|
{ok, _} = emqx_resource:create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource, register => true},
|
|
#{
|
|
query_mode => async,
|
|
batch_size => BatchSize,
|
|
async_inflight_window => WindowSize,
|
|
worker_pool_size => 1,
|
|
resume_interval => 300
|
|
}
|
|
),
|
|
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
|
|
|
|
%% block the resource
|
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
|
|
|
|
%% send async query to make the inflight window full
|
|
NumMsgs = BatchSize * WindowSize,
|
|
?check_trace(
|
|
begin
|
|
{ok, SRef} = snabbkaffe:subscribe(
|
|
?match_event(#{?snk_kind := call_batch_query_async}),
|
|
WindowSize,
|
|
_Timeout = 60_000
|
|
),
|
|
inc_counter_in_parallel(NumMsgs, ReqOpts),
|
|
{ok, _} = snabbkaffe:receive_events(SRef),
|
|
ok
|
|
end,
|
|
fun(Trace) ->
|
|
QueryTrace = ?of_kind(call_batch_query_async, Trace),
|
|
?assertMatch(
|
|
[
|
|
#{
|
|
batch := [
|
|
{query, _, {inc_counter, 1}, _},
|
|
{query, _, {inc_counter, 1}, _}
|
|
]
|
|
}
|
|
| _
|
|
],
|
|
QueryTrace
|
|
)
|
|
end
|
|
),
|
|
tap_metrics(?LINE),
|
|
|
|
?check_trace(
|
|
begin
|
|
%% this will block the resource_worker as the inflight window is full now
|
|
{ok, {ok, _}} =
|
|
?wait_async_action(
|
|
emqx_resource:query(?ID, {inc_counter, 2}),
|
|
#{?snk_kind := resource_worker_enter_blocked},
|
|
5_000
|
|
),
|
|
?assertMatch(0, ets:info(Tab0, size)),
|
|
ok
|
|
end,
|
|
[]
|
|
),
|
|
|
|
tap_metrics(?LINE),
|
|
%% send query now will fail because the resource is blocked.
|
|
Insert = fun(Tab, Ref, Result) ->
|
|
ct:pal("inserting ~p", [{Ref, Result}]),
|
|
ets:insert(Tab, {Ref, Result}),
|
|
?tp(tmp_query_inserted, #{})
|
|
end,
|
|
{ok, {ok, _}} =
|
|
?wait_async_action(
|
|
emqx_resource:query(?ID, {inc_counter, 3}, #{
|
|
async_reply_fun => {Insert, [Tab0, tmp_query]}
|
|
}),
|
|
#{?snk_kind := tmp_query_inserted},
|
|
1_000
|
|
),
|
|
%% since this counts as a failure, it'll be enqueued and retried
|
|
%% later, when the resource is unblocked.
|
|
?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)),
|
|
tap_metrics(?LINE),
|
|
|
|
%% all responses should be received after the resource is resumed.
|
|
{ok, SRef0} = snabbkaffe:subscribe(
|
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
|
%% +1 because the tmp_query above will be retried and succeed
|
|
%% this time.
|
|
WindowSize + 1,
|
|
_Timeout = 60_000
|
|
),
|
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
|
tap_metrics(?LINE),
|
|
{ok, _} = snabbkaffe:receive_events(SRef0),
|
|
%% since the previous tmp_query was enqueued to be retried, we
|
|
%% take it again from the table; this time, it should have
|
|
%% succeeded.
|
|
?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
|
|
?assertEqual(NumMsgs, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
|
tap_metrics(?LINE),
|
|
|
|
%% send async query, this time everything should be ok.
|
|
NumBatches1 = 3,
|
|
NumMsgs1 = BatchSize * NumBatches1,
|
|
?check_trace(
|
|
?TRACE_OPTS,
|
|
begin
|
|
{ok, SRef} = snabbkaffe:subscribe(
|
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
|
NumBatches1,
|
|
_Timeout = 60_000
|
|
),
|
|
inc_counter_in_parallel(NumMsgs1, ReqOpts),
|
|
{ok, _} = snabbkaffe:receive_events(SRef),
|
|
ok
|
|
end,
|
|
fun(Trace) ->
|
|
QueryTrace = ?of_kind(call_batch_query_async, Trace),
|
|
?assertMatch(
|
|
[#{batch := [{query, _, {inc_counter, _}, _} | _]} | _],
|
|
QueryTrace
|
|
)
|
|
end
|
|
),
|
|
?assertEqual(
|
|
NumMsgs + NumMsgs1,
|
|
ets:info(Tab0, size),
|
|
#{tab => ets:tab2list(Tab0)}
|
|
),
|
|
tap_metrics(?LINE),
|
|
|
|
%% block the resource
|
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
|
|
%% again, send async query to make the inflight window full
|
|
?check_trace(
|
|
?TRACE_OPTS,
|
|
inc_counter_in_parallel(WindowSize, ReqOpts),
|
|
fun(Trace) ->
|
|
QueryTrace = ?of_kind(call_batch_query_async, Trace),
|
|
?assertMatch(
|
|
[#{batch := [{query, _, {inc_counter, _}, _} | _]} | _],
|
|
QueryTrace
|
|
)
|
|
end
|
|
),
|
|
|
|
%% this will block the resource_worker
|
|
ok = emqx_resource:query(?ID, {inc_counter, 1}),
|
|
|
|
Sent = NumMsgs + NumMsgs1 + WindowSize,
|
|
{ok, SRef1} = snabbkaffe:subscribe(
|
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
|
WindowSize,
|
|
_Timeout = 60_000
|
|
),
|
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
|
{ok, _} = snabbkaffe:receive_events(SRef1),
|
|
?assertEqual(Sent, ets:info(Tab0, size)),
|
|
|
|
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
|
|
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
|
|
?assert(Sent =< Counter),
|
|
|
|
%% give the metrics some time to stabilize.
|
|
ct:sleep(1000),
|
|
#{counters := C, gauges := G} = tap_metrics(?LINE),
|
|
?assertMatch(
|
|
#{
|
|
counters :=
|
|
#{matched := M, success := Ss, dropped := Dp},
|
|
gauges := #{queuing := Qing, inflight := Infl}
|
|
} when
|
|
M == Ss + Dp + Qing + Infl,
|
|
#{counters => C, gauges => G},
|
|
#{
|
|
metrics => #{counters => C, gauges => G},
|
|
results => ets:tab2list(Tab0),
|
|
metrics_trace => ets:tab2list(MetricsTab)
|
|
}
|
|
),
|
|
?assert(
|
|
lists:all(
|
|
fun
|
|
({_, ok}) -> true;
|
|
(_) -> false
|
|
end,
|
|
ets:tab2list(Tab0)
|
|
)
|
|
),
|
|
ok = emqx_resource:remove_local(?ID).
|
|
|
|
t_healthy_timeout(_) ->
|
|
{ok, _} = emqx_resource:create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => <<"bad_not_atom_name">>, register => true},
|
|
%% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
|
|
#{health_check_interval => 200}
|
|
),
|
|
?assertMatch(
|
|
?RESOURCE_ERROR(not_connected),
|
|
emqx_resource:query(?ID, get_state)
|
|
),
|
|
ok = emqx_resource:remove_local(?ID).
|
|
|
|
t_healthy(_) ->
|
|
{ok, _} = emqx_resource:create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource}
|
|
),
|
|
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
|
|
timer:sleep(300),
|
|
emqx_resource:set_resource_status_connecting(?ID),
|
|
|
|
{ok, connected} = emqx_resource:health_check(?ID),
|
|
?assertMatch(
|
|
[#{status := connected}],
|
|
emqx_resource:list_instances_verbose()
|
|
),
|
|
|
|
erlang:exit(Pid, shutdown),
|
|
|
|
?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)),
|
|
|
|
?assertMatch(
|
|
[#{status := disconnected}],
|
|
emqx_resource:list_instances_verbose()
|
|
),
|
|
|
|
ok = emqx_resource:remove_local(?ID).
|
|
|
|
t_stop_start(_) ->
|
|
{error, _} = emqx_resource:check_and_create(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{unknown => test_resource}
|
|
),
|
|
|
|
{ok, _} = emqx_resource:check_and_create(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{<<"name">> => <<"test_resource">>}
|
|
),
|
|
|
|
%% add some metrics to test their persistence
|
|
WorkerID0 = <<"worker:0">>,
|
|
WorkerID1 = <<"worker:1">>,
|
|
emqx_resource_metrics:inflight_set(?ID, WorkerID0, 2),
|
|
emqx_resource_metrics:inflight_set(?ID, WorkerID1, 3),
|
|
?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
|
|
|
|
{ok, _} = emqx_resource:check_and_recreate(
|
|
?ID,
|
|
?TEST_RESOURCE,
|
|
#{<<"name">> => <<"test_resource">>},
|
|
#{}
|
|
),
|
|
|
|
{ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
|
|
|
|
?assert(is_process_alive(Pid0)),
|
|
|
|
%% metrics are reset when recreating
|
|
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
|
|
|
|
ok = emqx_resource:stop(?ID),
|
|
|
|
?assertNot(is_process_alive(Pid0)),
|
|
|
|
?assertMatch(
|
|
?RESOURCE_ERROR(stopped),
|
|
emqx_resource:query(?ID, get_state)
|
|
),
|
|
|
|
ok = emqx_resource:restart(?ID),
|
|
timer:sleep(300),
|
|
|
|
{ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
|
|
|
|
?assert(is_process_alive(Pid1)),
|
|
|
|
%% now stop while resetting the metrics
|
|
emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1),
|
|
emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4),
|
|
?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
|
|
ok = emqx_resource:stop(?ID),
|
|
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
|
|
|
|
ok.
|
|
|
|
t_stop_start_local(_) ->
|
|
{error, _} = emqx_resource:check_and_create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{unknown => test_resource}
|
|
),
|
|
|
|
{ok, _} = emqx_resource:check_and_create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{<<"name">> => <<"test_resource">>}
|
|
),
|
|
|
|
{ok, _} = emqx_resource:check_and_recreate_local(
|
|
?ID,
|
|
?TEST_RESOURCE,
|
|
#{<<"name">> => <<"test_resource">>},
|
|
#{}
|
|
),
|
|
|
|
{ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
|
|
|
|
?assert(is_process_alive(Pid0)),
|
|
|
|
ok = emqx_resource:stop(?ID),
|
|
|
|
?assertNot(is_process_alive(Pid0)),
|
|
|
|
?assertMatch(
|
|
?RESOURCE_ERROR(stopped),
|
|
emqx_resource:query(?ID, get_state)
|
|
),
|
|
|
|
ok = emqx_resource:restart(?ID),
|
|
|
|
{ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
|
|
|
|
?assert(is_process_alive(Pid1)).
|
|
|
|
t_list_filter(_) ->
|
|
{ok, _} = emqx_resource:create_local(
|
|
emqx_resource:generate_id(<<"a">>),
|
|
<<"group1">>,
|
|
?TEST_RESOURCE,
|
|
#{name => a}
|
|
),
|
|
{ok, _} = emqx_resource:create_local(
|
|
emqx_resource:generate_id(<<"a">>),
|
|
<<"group2">>,
|
|
?TEST_RESOURCE,
|
|
#{name => grouped_a}
|
|
),
|
|
|
|
[Id1] = emqx_resource:list_group_instances(<<"group1">>),
|
|
?assertMatch(
|
|
{ok, <<"group1">>, #{config := #{name := a}}},
|
|
emqx_resource:get_instance(Id1)
|
|
),
|
|
|
|
[Id2] = emqx_resource:list_group_instances(<<"group2">>),
|
|
?assertMatch(
|
|
{ok, <<"group2">>, #{config := #{name := grouped_a}}},
|
|
emqx_resource:get_instance(Id2)
|
|
).
|
|
|
|
t_create_dry_run_local(_) ->
|
|
ets:match_delete(emqx_resource_manager, {{owner, '$1'}, '_'}),
|
|
lists:foreach(
|
|
fun(_) ->
|
|
create_dry_run_local_succ()
|
|
end,
|
|
lists:seq(1, 10)
|
|
),
|
|
case [] =:= ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}) of
|
|
false ->
|
|
%% Sleep to remove flakyness in test case. It take some time for
|
|
%% the ETS table to be cleared.
|
|
timer:sleep(2000),
|
|
[] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'});
|
|
true ->
|
|
ok
|
|
end.
|
|
|
|
create_dry_run_local_succ() ->
|
|
case whereis(test_resource) of
|
|
undefined -> ok;
|
|
Pid -> exit(Pid, kill)
|
|
end,
|
|
?assertEqual(
|
|
ok,
|
|
emqx_resource:create_dry_run_local(
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource, register => true}
|
|
)
|
|
),
|
|
?assertEqual(undefined, whereis(test_resource)).
|
|
|
|
t_create_dry_run_local_failed(_) ->
|
|
ct:timetrap({seconds, 120}),
|
|
ct:pal("creating with creation error"),
|
|
Res1 = emqx_resource:create_dry_run_local(
|
|
?TEST_RESOURCE,
|
|
#{create_error => true}
|
|
),
|
|
?assertMatch({error, _}, Res1),
|
|
|
|
ct:pal("creating with health check error"),
|
|
Res2 = emqx_resource:create_dry_run_local(
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource, health_check_error => true}
|
|
),
|
|
?assertMatch({error, _}, Res2),
|
|
|
|
ct:pal("creating with stop error"),
|
|
Res3 = emqx_resource:create_dry_run_local(
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource, stop_error => true}
|
|
),
|
|
?assertEqual(ok, Res3).
|
|
|
|
t_test_func(_) ->
|
|
?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),
|
|
?assertEqual(ok, erlang:apply(emqx_resource_validator:min(int, 3), [4])),
|
|
?assertEqual(ok, erlang:apply(emqx_resource_validator:max(array, 10), [[a, b, c, d]])),
|
|
?assertEqual(ok, erlang:apply(emqx_resource_validator:max(string, 10), ["less10"])).
|
|
|
|
t_reset_metrics(_) ->
|
|
{ok, _} = emqx_resource:create(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource}
|
|
),
|
|
|
|
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
|
|
emqx_resource:reset_metrics(?ID),
|
|
?assert(is_process_alive(Pid)),
|
|
ok = emqx_resource:remove(?ID),
|
|
?assertNot(is_process_alive(Pid)).
|
|
|
|
t_auto_retry(_) ->
|
|
{Res, _} = emqx_resource:create_local(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource, create_error => true},
|
|
#{auto_retry_interval => 100}
|
|
),
|
|
?assertEqual(ok, Res).
|
|
|
|
t_retry_batch(_Config) ->
|
|
{ok, _} = emqx_resource:create(
|
|
?ID,
|
|
?DEFAULT_RESOURCE_GROUP,
|
|
?TEST_RESOURCE,
|
|
#{name => test_resource},
|
|
#{
|
|
query_mode => async,
|
|
batch_size => 5,
|
|
worker_pool_size => 1,
|
|
resume_interval => 1_000
|
|
}
|
|
),
|
|
|
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
|
|
Matched0 = emqx_resource_metrics:matched_get(?ID),
|
|
?assertEqual(1, Matched0),
|
|
|
|
%% these requests will batch together and fail; the buffer worker
|
|
%% will enter the `blocked' state and they'll be retried later,
|
|
%% after it unblocks.
|
|
Payloads = lists:seq(1, 5),
|
|
NumPayloads = length(Payloads),
|
|
ExpectedCount = 15,
|
|
|
|
?check_trace(
|
|
begin
|
|
{ok, {ok, _}} =
|
|
?wait_async_action(
|
|
lists:foreach(
|
|
fun(N) ->
|
|
ok = emqx_resource:query(?ID, {inc_counter, N})
|
|
end,
|
|
Payloads
|
|
),
|
|
#{?snk_kind := resource_worker_enter_blocked},
|
|
5_000
|
|
),
|
|
%% now the individual messages should have been counted
|
|
Matched1 = emqx_resource_metrics:matched_get(?ID),
|
|
?assertEqual(Matched0 + NumPayloads, Matched1),
|
|
|
|
%% wait for two more retries while the failure is enabled; the
|
|
%% batch shall remain enqueued.
|
|
{ok, _} =
|
|
snabbkaffe:block_until(
|
|
?match_n_events(2, #{?snk_kind := resource_worker_retry_queue_batch_failed}),
|
|
5_000
|
|
),
|
|
%% should not have increased the matched count with the retries
|
|
Matched2 = emqx_resource_metrics:matched_get(?ID),
|
|
?assertEqual(Matched1, Matched2),
|
|
|
|
%% now unblock the buffer worker so it may retry the batch,
|
|
%% but it'll still fail
|
|
{ok, {ok, _}} =
|
|
?wait_async_action(
|
|
ok = emqx_resource:simple_sync_query(?ID, resume),
|
|
#{?snk_kind := resource_worker_retry_queue_batch_succeeded},
|
|
5_000
|
|
),
|
|
%% 1 more because of the `resume' call
|
|
Matched3 = emqx_resource_metrics:matched_get(?ID),
|
|
?assertEqual(Matched2 + 1, Matched3),
|
|
|
|
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
|
|
{Counter, Matched3}
|
|
end,
|
|
fun({Counter, Matched3}, Trace) ->
|
|
%% 1 original attempt + 2 failed retries + final
|
|
%% successful attempt.
|
|
%% each time should be the original batch (no duplicate
|
|
%% elements or reordering).
|
|
ExpectedSeenPayloads = lists:flatten(lists:duplicate(4, Payloads)),
|
|
?assertEqual(
|
|
ExpectedSeenPayloads,
|
|
?projection(n, ?of_kind(connector_demo_batch_inc_individual, Trace))
|
|
),
|
|
?assertMatch(
|
|
[#{n := ExpectedCount}],
|
|
?of_kind(connector_demo_inc_counter, Trace)
|
|
),
|
|
?assertEqual(ExpectedCount, Counter),
|
|
%% matched should count only the original requests, and not retries
|
|
%% + 1 for `resume' call
|
|
%% + 1 for `block' call
|
|
%% + 1 for `get_counter' call
|
|
%% and the message count (1 time)
|
|
Matched4 = emqx_resource_metrics:matched_get(?ID),
|
|
?assertEqual(Matched3 + 1, Matched4),
|
|
ok
|
|
end
|
|
),
|
|
ok.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Helpers
|
|
%%------------------------------------------------------------------------------
|
|
inc_counter_in_parallel(N) ->
|
|
inc_counter_in_parallel(N, #{}).
|
|
|
|
inc_counter_in_parallel(N, Opts0) ->
|
|
Parent = self(),
|
|
Pids = [
|
|
erlang:spawn(fun() ->
|
|
Opts =
|
|
case is_function(Opts0) of
|
|
true -> Opts0();
|
|
false -> Opts0
|
|
end,
|
|
emqx_resource:query(?ID, {inc_counter, 1}, Opts),
|
|
Parent ! {complete, self()}
|
|
end)
|
|
|| _ <- lists:seq(1, N)
|
|
],
|
|
[
|
|
receive
|
|
{complete, Pid} -> ok
|
|
after 1000 ->
|
|
ct:fail({wait_for_query_timeout, Pid})
|
|
end
|
|
|| Pid <- Pids
|
|
].
|
|
|
|
bin_config() ->
|
|
<<"\"name\": \"test_resource\"">>.
|
|
|
|
config() ->
|
|
{ok, Config} = hocon:binary(bin_config()),
|
|
Config.
|
|
|
|
tap_metrics(Line) ->
|
|
{ok, _, #{metrics := #{counters := C, gauges := G}}} = emqx_resource:get_instance(?ID),
|
|
ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]),
|
|
#{counters => C, gauges => G}.
|