diff --git a/apps/emqx_connector/include/emqx_connector.hrl b/apps/emqx_connector/include/emqx_connector.hrl index 7c429784e..f97a0110f 100644 --- a/apps/emqx_connector/include/emqx_connector.hrl +++ b/apps/emqx_connector/include/emqx_connector.hrl @@ -33,3 +33,5 @@ The """ ++ TYPE ++ " default port " ++ DEFAULT_PORT ++ " is used if '[:Port]' is ). -define(THROW_ERROR(Str), erlang:throw({error, Str})). + +-define(CONNECTOR_RESOURCE_GROUP, <<"emqx_connector">>). diff --git a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl index dacb5dab0..920150dd2 100644 --- a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl @@ -24,6 +24,7 @@ -include_lib("stdlib/include/assert.hrl"). -define(PGSQL_HOST, "pgsql"). +-define(PGSQL_RESOURCE_MOD, emqx_connector_pgsql). all() -> emqx_common_test_helpers:all(?MODULE). @@ -34,111 +35,89 @@ groups() -> init_per_suite(Config) -> case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of true -> - ok = emqx_connector_test_helpers:start_apps([ecpool, pgsql]), + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_connector]), Config; false -> {skip, no_pgsql} end. end_per_suite(_Config) -> - ok = emqx_connector_test_helpers:stop_apps([ecpool, pgsql]). + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps([emqx_resource, emqx_connector]). init_per_testcase(_, Config) -> - ?assertEqual( - {ok, #{poolname => emqx_connector_pgsql}}, - emqx_connector_pgsql:on_start(<<"emqx_connector_pgsql">>, pgsql_config()) - ), Config. end_per_testcase(_, _Config) -> - ?assertEqual( - ok, - emqx_connector_pgsql:on_stop(<<"emqx_connector_pgsql">>, #{poolname => emqx_connector_pgsql}) - ). + ok. % %%------------------------------------------------------------------------------ % %% Testcases % %%------------------------------------------------------------------------------ -% Simple test to make sure the proper reference to the module is returned. -t_roots(_Config) -> - ExpectedRoots = [{config, #{type => {ref, emqx_connector_pgsql, config}}}], - ActualRoots = emqx_connector_pgsql:roots(), - ?assertEqual(ExpectedRoots, ActualRoots). - -% Not sure if this level of testing is appropriate for this function. -% Checking the actual values/types of the returned term starts getting -% into checking the emqx_connector_schema_lib.erl returns and the shape -% of expected data elsewhere. -t_fields(_Config) -> - Fields = emqx_connector_pgsql:fields(config), - lists:foreach( - fun({FieldName, FieldValue}) -> - ?assert(is_atom(FieldName)), - if - is_map(FieldValue) -> - ?assert(maps:is_key(type, FieldValue) and maps:is_key(default, FieldValue)); - true -> - ?assert(is_function(FieldValue)) - end - end, - Fields +t_lifecycle(_Config) -> + perform_lifecycle_check( + <<"emqx_connector_pgsql_SUITE">>, + pgsql_config() ). -% Execute a minimal query to validate connection. -t_basic_query(_Config) -> - ?assertMatch( - {ok, _, [{1}]}, - emqx_connector_pgsql:on_query( - <<"emqx_connector_pgsql">>, {query, test_query()}, undefined, #{ - poolname => emqx_connector_pgsql - } - ) - ). - -% Perform health check. -t_do_healthcheck(_Config) -> - ?assertEqual( - {ok, #{poolname => emqx_connector_pgsql}}, - emqx_connector_pgsql:on_health_check(<<"emqx_connector_pgsql">>, #{ - poolname => emqx_connector_pgsql - }) - ). - -% Perform healthcheck on a connector that does not exist. -t_healthceck_when_connector_does_not_exist(_Config) -> - ?assertEqual( - {error, health_check_failed, #{poolname => emqx_connector_pgsql_does_not_exist}}, - emqx_connector_pgsql:on_health_check(<<"emqx_connector_pgsql_does_not_exist">>, #{ - poolname => emqx_connector_pgsql_does_not_exist - }) - ). +perform_lifecycle_check(PoolName, InitialConfig) -> + {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?PGSQL_RESOURCE_MOD, InitialConfig), + {ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local( + PoolName, + ?CONNECTOR_RESOURCE_GROUP, + ?PGSQL_RESOURCE_MOD, + CheckedConfig + ), + ?assertEqual(InitialStatus, started), + % Instance should match the state and status of the just started resource + {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName), + ?assertEqual(ok, emqx_resource:health_check(PoolName)), + % % Perform query as further check that the resource is working as expected + ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())), + ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_with_params())), + ?assertEqual(ok, emqx_resource:stop(PoolName)), + % Resource will be listed still, but state will be changed and healthcheck will fail + % as the worker no longer exists. + {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName), + ?assertEqual(StoppedStatus, stopped), + ?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)), + % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + % Can call stop/1 again on an already stopped instance + ?assertEqual(ok, emqx_resource:stop(PoolName)), + % Make sure it can be restarted and the healthchecks and queries work properly + ?assertEqual(ok, emqx_resource:restart(PoolName)), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), + ?assertEqual(ok, emqx_resource:health_check(PoolName)), + ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())), + ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_with_params())), + % Stop and remove the resource in one go. + ?assertEqual(ok, emqx_resource:remove_local(PoolName)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + % Should not even be able to get the resource data out of ets now unlike just stopping. + ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)). % %%------------------------------------------------------------------------------ % %% Helpers % %%------------------------------------------------------------------------------ pgsql_config() -> - #{ - auto_reconnect => true, - database => <<"mqtt">>, - username => <<"root">>, - password => <<"public">>, - pool_size => 8, - server => {?PGSQL_HOST, ?PGSQL_DEFAULT_PORT}, - ssl => #{enable => false} - }. + RawConfig = list_to_binary(io_lib:format(""" + auto_reconnect = true + database = mqtt + username= root + password = public + pool_size = 8 + server = \"~s:~b\" + """, [?PGSQL_HOST, ?PGSQL_DEFAULT_PORT])), -pgsql_bad_config() -> - #{ - auto_reconnect => true, - database => <<"bad_mqtt">>, - username => <<"bad_root">>, - password => <<"bad_public">>, - pool_size => 8, - server => {?PGSQL_HOST, ?PGSQL_DEFAULT_PORT}, - ssl => #{enable => false} - }. + {ok, Config} = hocon:binary(RawConfig), + #{<<"config">> => Config}. -test_query() -> - <<"SELECT 1">>. +test_query_no_params() -> + {query, <<"SELECT 1">>}. + +test_query_with_params() -> + {query, <<"SELECT $1::integer">>, [1]}.