emqx/apps/emqx_resource/test/emqx_resource_SUITE.erl

717 lines
21 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_resource_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_resource.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(TEST_RESOURCE, emqx_connector_demo).
-define(ID, <<"id">>).
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
-define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
init_per_testcase(_, Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
Config.
end_per_testcase(_, _Config) ->
_ = emqx_resource:remove(?ID).
init_per_suite(Config) ->
code:ensure_loaded(?TEST_RESOURCE),
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
{ok, _} = application:ensure_all_started(emqx_resource),
Config.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_resource, emqx_conf]).
%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------
t_list_types(_) ->
?assert(lists:member(?TEST_RESOURCE, emqx_resource:list_types())).
t_check_config(_) ->
{ok, #{}} = emqx_resource:check_config(?TEST_RESOURCE, bin_config()),
{ok, #{}} = emqx_resource:check_config(?TEST_RESOURCE, config()),
{error, _} = emqx_resource:check_config(?TEST_RESOURCE, <<"not a config">>),
{error, _} = emqx_resource:check_config(?TEST_RESOURCE, #{invalid => config}).
t_create_remove(_) ->
{error, _} = emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}
),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}
),
{ok, _} = emqx_resource:recreate(
?ID,
?TEST_RESOURCE,
#{name => test_resource},
#{}
),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid)),
ok = emqx_resource:remove(?ID),
{error, _} = emqx_resource:remove(?ID),
?assertNot(is_process_alive(Pid)).
t_create_remove_local(_) ->
{error, _} = emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}
),
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}
),
emqx_resource:recreate_local(
?ID,
?TEST_RESOURCE,
#{name => test_resource},
#{}
),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid)),
emqx_resource:set_resource_status_connecting(?ID),
emqx_resource:recreate_local(
?ID,
?TEST_RESOURCE,
#{name => test_resource},
#{}
),
ok = emqx_resource:remove_local(?ID),
{error, _} = emqx_resource:remove_local(?ID),
?assertMatch(
?RESOURCE_ERROR(not_found),
emqx_resource:query(?ID, get_state)
),
?assertNot(is_process_alive(Pid)).
t_do_not_start_after_created(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{start_after_created => false}
),
%% the resource should remain `disconnected` after created
timer:sleep(200),
?assertMatch(
?RESOURCE_ERROR(stopped),
emqx_resource:query(?ID, get_state)
),
?assertMatch(
{ok, _, #{status := stopped}},
emqx_resource:get_instance(?ID)
),
%% start the resource manually..
ok = emqx_resource:start(?ID),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid)),
%% restart the resource
ok = emqx_resource:restart(?ID),
?assertNot(is_process_alive(Pid)),
{ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid2)),
ok = emqx_resource:remove_local(?ID),
?assertNot(is_process_alive(Pid2)).
t_query(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}
),
{ok, #{pid := _}} = emqx_resource:query(?ID, get_state),
?assertMatch(
?RESOURCE_ERROR(not_found),
emqx_resource:query(<<"unknown">>, get_state)
),
ok = emqx_resource:remove_local(?ID).
t_query_counter(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true}
),
{ok, 0} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:query(?ID, {inc_counter, 1}),
{ok, 1} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:query(?ID, {inc_counter, 5}),
{ok, 6} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:remove_local(?ID).
t_batch_query_counter(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true},
#{enable_batch => true, query_mode => sync}
),
?check_trace(
?TRACE_OPTS,
emqx_resource:query(?ID, get_counter),
fun(Result, Trace) ->
?assertMatch({ok, 0}, Result),
QueryTrace = ?of_kind(call_batch_query, Trace),
?assertMatch([#{batch := [{query, _, get_counter, _}]}], QueryTrace)
end
),
?check_trace(
?TRACE_OPTS,
inc_counter_in_parallel(1000),
fun(Trace) ->
QueryTrace = ?of_kind(call_batch_query, Trace),
?assertMatch([#{batch := BatchReq} | _] when length(BatchReq) > 1, QueryTrace)
end
),
{ok, 1000} = emqx_resource:query(?ID, get_counter),
ok = emqx_resource:remove_local(?ID).
t_query_counter_async_query(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true},
#{query_mode => async, enable_batch => false}
),
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
?check_trace(
?TRACE_OPTS,
inc_counter_in_parallel(1000),
fun(Trace) ->
%% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
QueryTrace = ?of_kind(call_query, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end
),
%% wait for 1s to make sure all the aysnc query is sent to the resource.
timer:sleep(1000),
%% simple query ignores the query_mode and batching settings in the resource_worker
?check_trace(
?TRACE_OPTS,
emqx_resource:simple_sync_query(?ID, get_counter),
fun(Result, Trace) ->
?assertMatch({ok, 1000}, Result),
%% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
QueryTrace = ?of_kind(call_query, Trace),
?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace)
end
),
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C),
ok = emqx_resource:remove_local(?ID).
t_query_counter_async_callback(_) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
Insert = fun(Tab, Result) ->
ets:insert(Tab, {make_ref(), Result})
end,
ReqOpts = #{async_reply_fun => {Insert, [Tab0]}},
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true},
#{query_mode => async, enable_batch => false, async_inflight_window => 1000000}
),
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
?check_trace(
?TRACE_OPTS,
inc_counter_in_parallel(1000, ReqOpts),
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end
),
%% wait for 1s to make sure all the aysnc query is sent to the resource.
timer:sleep(1000),
%% simple query ignores the query_mode and batching settings in the resource_worker
?check_trace(
?TRACE_OPTS,
emqx_resource:simple_sync_query(?ID, get_counter),
fun(Result, Trace) ->
?assertMatch({ok, 1000}, Result),
QueryTrace = ?of_kind(call_query, Trace),
?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace)
end
),
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C),
?assertMatch(1000, ets:info(Tab0, size)),
?assert(
lists:all(
fun
({_, ok}) -> true;
(_) -> false
end,
ets:tab2list(Tab0)
)
),
ok = emqx_resource:remove_local(?ID).
t_query_counter_async_inflight(_) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
Insert0 = fun(Tab, Result) ->
ets:insert(Tab, {make_ref(), Result})
end,
ReqOpts = #{async_reply_fun => {Insert0, [Tab0]}},
WindowSize = 15,
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, register => true},
#{
query_mode => async,
enable_batch => false,
async_inflight_window => WindowSize,
worker_pool_size => 1,
resume_interval => 300,
enable_queue => false
}
),
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
%% block the resource
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
%% send async query to make the inflight window full
?check_trace(
?TRACE_OPTS,
inc_counter_in_parallel(WindowSize, ReqOpts),
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end
),
%% this will block the resource_worker as the inflight window is full now
ok = emqx_resource:query(?ID, {inc_counter, 1}),
?assertMatch(0, ets:info(Tab0, size)),
%% sleep to make the resource_worker resume some times
timer:sleep(2000),
%% send query now will fail because the resource is blocked.
Insert = fun(Tab, Ref, Result) ->
ets:insert(Tab, {Ref, Result})
end,
ok = emqx_resource:query(?ID, {inc_counter, 1}, #{
async_reply_fun => {Insert, [Tab0, tmp_query]}
}),
timer:sleep(100),
?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)),
%% all response should be received after the resource is resumed.
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
timer:sleep(1000),
?assertEqual(WindowSize, ets:info(Tab0, size)),
%% send async query, this time everything should be ok.
Num = 10,
?check_trace(
?TRACE_OPTS,
inc_counter_in_parallel(Num, ReqOpts),
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end
),
timer:sleep(1000),
?assertEqual(WindowSize + Num, ets:info(Tab0, size)),
%% block the resource
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
%% again, send async query to make the inflight window full
?check_trace(
?TRACE_OPTS,
inc_counter_in_parallel(WindowSize, ReqOpts),
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end
),
%% this will block the resource_worker
ok = emqx_resource:query(?ID, {inc_counter, 1}),
Sent = WindowSize + Num + WindowSize,
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
timer:sleep(1000),
?assertEqual(Sent, ets:info(Tab0, size)),
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
?assert(Sent =< Counter),
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
ct:pal("metrics: ~p", [C]),
{ok, IncorrectStatusCount} = emqx_resource:simple_sync_query(?ID, get_incorrect_status_count),
%% The `simple_sync_query' we just did also increases the matched
%% count, hence the + 1.
ExtraSimpleCallCount = IncorrectStatusCount + 1,
?assertMatch(
#{matched := M, success := Ss, dropped := Dp, 'retried.success' := Rs} when
M == Ss + Dp - Rs + ExtraSimpleCallCount,
C,
#{
metrics => C,
extra_simple_call_count => ExtraSimpleCallCount
}
),
?assert(
lists:all(
fun
({_, ok}) -> true;
(_) -> false
end,
ets:tab2list(Tab0)
)
),
ok = emqx_resource:remove_local(?ID).
t_healthy_timeout(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => <<"bad_not_atom_name">>, register => true},
%% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
#{health_check_interval => 200}
),
?assertMatch(
?RESOURCE_ERROR(not_connected),
emqx_resource:query(?ID, get_state)
),
ok = emqx_resource:remove_local(?ID).
t_healthy(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}
),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
timer:sleep(300),
emqx_resource:set_resource_status_connecting(?ID),
{ok, connected} = emqx_resource:health_check(?ID),
?assertMatch(
[#{status := connected}],
emqx_resource:list_instances_verbose()
),
erlang:exit(Pid, shutdown),
?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)),
?assertMatch(
[#{status := disconnected}],
emqx_resource:list_instances_verbose()
),
ok = emqx_resource:remove_local(?ID).
t_stop_start(_) ->
{error, _} = emqx_resource:check_and_create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}
),
{ok, _} = emqx_resource:check_and_create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{<<"name">> => <<"test_resource">>}
),
%% add some metrics to test their persistence
emqx_resource_metrics:batching_change(?ID, 5),
?assertEqual(5, emqx_resource_metrics:batching_get(?ID)),
{ok, _} = emqx_resource:check_and_recreate(
?ID,
?TEST_RESOURCE,
#{<<"name">> => <<"test_resource">>},
#{}
),
{ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid0)),
%% metrics are reset when recreating
?assertEqual(0, emqx_resource_metrics:batching_get(?ID)),
ok = emqx_resource:stop(?ID),
?assertNot(is_process_alive(Pid0)),
?assertMatch(
?RESOURCE_ERROR(stopped),
emqx_resource:query(?ID, get_state)
),
ok = emqx_resource:restart(?ID),
timer:sleep(300),
{ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid1)),
%% now stop while resetting the metrics
emqx_resource_metrics:batching_change(?ID, 5),
?assertEqual(5, emqx_resource_metrics:batching_get(?ID)),
ok = emqx_resource:stop(?ID),
?assertEqual(0, emqx_resource_metrics:batching_get(?ID)),
ok.
t_stop_start_local(_) ->
{error, _} = emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}
),
{ok, _} = emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{<<"name">> => <<"test_resource">>}
),
{ok, _} = emqx_resource:check_and_recreate_local(
?ID,
?TEST_RESOURCE,
#{<<"name">> => <<"test_resource">>},
#{}
),
{ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid0)),
ok = emqx_resource:stop(?ID),
?assertNot(is_process_alive(Pid0)),
?assertMatch(
?RESOURCE_ERROR(stopped),
emqx_resource:query(?ID, get_state)
),
ok = emqx_resource:restart(?ID),
{ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid1)).
t_list_filter(_) ->
{ok, _} = emqx_resource:create_local(
emqx_resource:generate_id(<<"a">>),
<<"group1">>,
?TEST_RESOURCE,
#{name => a}
),
{ok, _} = emqx_resource:create_local(
emqx_resource:generate_id(<<"a">>),
<<"group2">>,
?TEST_RESOURCE,
#{name => grouped_a}
),
[Id1] = emqx_resource:list_group_instances(<<"group1">>),
?assertMatch(
{ok, <<"group1">>, #{config := #{name := a}}},
emqx_resource:get_instance(Id1)
),
[Id2] = emqx_resource:list_group_instances(<<"group2">>),
?assertMatch(
{ok, <<"group2">>, #{config := #{name := grouped_a}}},
emqx_resource:get_instance(Id2)
).
t_create_dry_run_local(_) ->
ets:match_delete(emqx_resource_manager, {{owner, '$1'}, '_'}),
lists:foreach(
fun(_) ->
create_dry_run_local_succ()
end,
lists:seq(1, 10)
),
[] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}).
create_dry_run_local_succ() ->
case whereis(test_resource) of
undefined -> ok;
Pid -> exit(Pid, kill)
end,
?assertEqual(
ok,
emqx_resource:create_dry_run_local(
?TEST_RESOURCE,
#{name => test_resource, register => true}
)
),
?assertEqual(undefined, whereis(test_resource)).
t_create_dry_run_local_failed(_) ->
Res1 = emqx_resource:create_dry_run_local(
?TEST_RESOURCE,
#{create_error => true}
),
?assertMatch({error, _}, Res1),
Res2 = emqx_resource:create_dry_run_local(
?TEST_RESOURCE,
#{name => test_resource, health_check_error => true}
),
?assertMatch({error, _}, Res2),
Res3 = emqx_resource:create_dry_run_local(
?TEST_RESOURCE,
#{name => test_resource, stop_error => true}
),
?assertEqual(ok, Res3).
t_test_func(_) ->
?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),
?assertEqual(ok, erlang:apply(emqx_resource_validator:min(int, 3), [4])),
?assertEqual(ok, erlang:apply(emqx_resource_validator:max(array, 10), [[a, b, c, d]])),
?assertEqual(ok, erlang:apply(emqx_resource_validator:max(string, 10), ["less10"])).
t_reset_metrics(_) ->
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}
),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
emqx_resource:reset_metrics(?ID),
?assert(is_process_alive(Pid)),
ok = emqx_resource:remove(?ID),
?assertNot(is_process_alive(Pid)).
t_auto_retry(_) ->
{Res, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, create_error => true},
#{auto_retry_interval => 100}
),
?assertEqual(ok, Res).
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
inc_counter_in_parallel(N) ->
inc_counter_in_parallel(N, #{}).
inc_counter_in_parallel(N, Opts) ->
Parent = self(),
Pids = [
erlang:spawn(fun() ->
emqx_resource:query(?ID, {inc_counter, 1}, Opts),
Parent ! {complete, self()}
end)
|| _ <- lists:seq(1, N)
],
[
receive
{complete, Pid} -> ok
after 1000 ->
ct:fail({wait_for_query_timeout, Pid})
end
|| Pid <- Pids
].
bin_config() ->
<<"\"name\": \"test_resource\"">>.
config() ->
{ok, Config} = hocon:binary(bin_config()),
Config.