From 2fb42e4d37442b141e1b0259bc71ae5807502704 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 28 Jul 2022 17:40:04 +0800 Subject: [PATCH] refactor: create emqx_resource_worker_sup for resource workers --- .../emqx_authn_jwks_connector.erl | 12 +- .../src/simple_authn/emqx_authn_mongodb.erl | 2 +- .../src/emqx_connector_http.erl | 29 ++-- .../src/emqx_connector_ldap.erl | 9 +- .../src/emqx_connector_mongo.erl | 10 +- .../src/emqx_connector_mqtt.erl | 10 +- .../src/emqx_connector_mysql.erl | 17 +-- .../src/emqx_connector_pgsql.erl | 13 +- .../src/emqx_connector_redis.erl | 9 +- apps/emqx_resource/include/emqx_resource.hrl | 14 +- apps/emqx_resource/src/emqx_resource.erl | 32 +---- .../src/emqx_resource_manager.erl | 15 +- apps/emqx_resource/src/emqx_resource_sup.erl | 85 ++--------- .../src/emqx_resource_worker.erl | 132 ++++++++++------- .../src/emqx_resource_worker_sup.erl | 136 ++++++++++++++++++ .../test/emqx_resource_SUITE.erl | 77 +++++----- .../emqx_resource/test/emqx_test_resource.erl | 27 ++-- 17 files changed, 332 insertions(+), 297 deletions(-) create mode 100644 apps/emqx_resource/src/emqx_resource_worker_sup.erl diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl index 8f98e2f1e..cd8451ac9 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl @@ -24,7 +24,7 @@ -export([ on_start/2, on_stop/2, - on_query/4, + on_query/3, on_get_status/2, connect/1 ]). @@ -45,7 +45,7 @@ on_start(InstId, Opts) -> on_stop(_InstId, #{pool_name := PoolName}) -> emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, get_jwks, AfterQuery, #{pool_name := PoolName}) -> +on_query(InstId, get_jwks, #{pool_name := PoolName}) -> Result = ecpool:pick_and_do(PoolName, {emqx_authn_jwks_client, get_jwks, []}, no_handover), case Result of {error, Reason} -> @@ -54,20 +54,18 @@ on_query(InstId, get_jwks, AfterQuery, #{pool_name := PoolName}) -> connector => InstId, command => get_jwks, reason => Reason - }), - emqx_resource:query_failed(AfterQuery); + }); _ -> - emqx_resource:query_success(AfterQuery) + ok end, Result; -on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) -> +on_query(_InstId, {update, Opts}, #{pool_name := PoolName}) -> lists:foreach( fun({_, Worker}) -> ok = ecpool_worker:exec(Worker, {emqx_authn_jwks_client, update, [Opts]}, infinity) end, ecpool:workers(PoolName) ), - emqx_resource:query_success(AfterQuery), ok. on_get_status(_InstId, #{pool_name := PoolName}) -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl index f7249ae57..ff9c97717 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl @@ -174,7 +174,7 @@ authenticate( reason => Reason }), ignore; - Doc -> + {ok, Doc} -> case check_password(Password, Doc, State) of ok -> {ok, is_superuser(Doc, State)}; diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 59b4ddffa..e0d5ccfe0 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -28,7 +28,7 @@ -export([ on_start/2, on_stop/2, - on_query/4, + on_query/3, on_get_status/2 ]). @@ -225,7 +225,7 @@ on_stop(InstId, #{pool_name := PoolName}) -> }), ehttpc_sup:stop_pool(PoolName). -on_query(InstId, {send_message, Msg}, AfterQuery, State) -> +on_query(InstId, {send_message, Msg}, State) -> case maps:get(request, State, undefined) of undefined -> ?SLOG(error, #{msg => "request_not_found", connector => InstId}); @@ -241,18 +241,16 @@ on_query(InstId, {send_message, Msg}, AfterQuery, State) -> on_query( InstId, {undefined, Method, {Path, Headers, Body}, Timeout, Retry}, - AfterQuery, State ) end; -on_query(InstId, {Method, Request}, AfterQuery, State) -> - on_query(InstId, {undefined, Method, Request, 5000, 2}, AfterQuery, State); -on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) -> - on_query(InstId, {undefined, Method, Request, Timeout, 2}, AfterQuery, State); +on_query(InstId, {Method, Request}, State) -> + on_query(InstId, {undefined, Method, Request, 5000, 2}, State); +on_query(InstId, {Method, Request, Timeout}, State) -> + on_query(InstId, {undefined, Method, Request, Timeout, 2}, State); on_query( InstId, {KeyOrNum, Method, Request, Timeout, Retry}, - AfterQuery, #{pool_name := PoolName, base_path := BasePath} = State ) -> ?TRACE( @@ -275,32 +273,29 @@ on_query( of {error, Reason} -> ?SLOG(error, #{ - msg => "http_connector_do_reqeust_failed", + msg => "http_connector_do_request_failed", request => NRequest, reason => Reason, connector => InstId - }), - emqx_resource:query_failed(AfterQuery); + }); {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> - emqx_resource:query_success(AfterQuery); + ok; {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> - emqx_resource:query_success(AfterQuery); + ok; {ok, StatusCode, _} -> ?SLOG(error, #{ msg => "http connector do request, received error response", request => NRequest, connector => InstId, status_code => StatusCode - }), - emqx_resource:query_failed(AfterQuery); + }); {ok, StatusCode, _, _} -> ?SLOG(error, #{ msg => "http connector do request, received error response", request => NRequest, connector => InstId, status_code => StatusCode - }), - emqx_resource:query_failed(AfterQuery) + }) end, Result. diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 195aa89a9..51d18b534 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -27,7 +27,7 @@ -export([ on_start/2, on_stop/2, - on_query/4, + on_query/3, on_get_status/2 ]). @@ -99,7 +99,7 @@ on_stop(InstId, #{poolname := PoolName}) -> }), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) -> +on_query(InstId, {search, Base, Filter, Attributes}, #{poolname := PoolName} = State) -> Request = {Base, Filter, Attributes}, ?TRACE( "QUERY", @@ -119,10 +119,9 @@ on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := P request => Request, connector => InstId, reason => Reason - }), - emqx_resource:query_failed(AfterQuery); + }); _ -> - emqx_resource:query_success(AfterQuery) + ok end, Result. diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 5b07c5003..db8b1e632 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -27,7 +27,7 @@ -export([ on_start/2, on_stop/2, - on_query/4, + on_query/3, on_get_status/2 ]). @@ -189,7 +189,6 @@ on_stop(InstId, #{poolname := PoolName}) -> on_query( InstId, {Action, Collection, Filter, Projector}, - AfterQuery, #{poolname := PoolName} = State ) -> Request = {Action, Collection, Filter, Projector}, @@ -212,14 +211,11 @@ on_query( reason => Reason, connector => InstId }), - emqx_resource:query_failed(AfterQuery), {error, Reason}; {ok, Cursor} when is_pid(Cursor) -> - emqx_resource:query_success(AfterQuery), - mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000); + {ok, mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)}; Result -> - emqx_resource:query_success(AfterQuery), - Result + {ok, Result} end. -dialyzer({nowarn_function, [on_get_status/2]}). diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 21e201504..98635de3f 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -37,7 +37,7 @@ -export([ on_start/2, on_stop/2, - on_query/4, + on_query/3, on_get_status/2 ]). @@ -181,12 +181,12 @@ on_stop(_InstId, #{name := InstanceId}) -> }) end. -on_query(_InstId, {message_received, _Msg}, AfterQuery, _State) -> - emqx_resource:query_success(AfterQuery); -on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) -> +on_query(_InstId, {message_received, _Msg}, _State) -> + ok; +on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), - emqx_resource:query_success(AfterQuery). + ok. on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> AutoReconn = maps:get(auto_reconnect, Conf, true), diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 409da4060..e818bc6ef 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -26,7 +26,7 @@ -export([ on_start/2, on_stop/2, - on_query/4, + on_query/3, on_get_status/2 ]). @@ -122,14 +122,13 @@ on_stop(InstId, #{poolname := PoolName}) -> }), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, {TypeOrKey, SQLOrKey}, AfterQuery, State) -> - on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, AfterQuery, State); -on_query(InstId, {TypeOrKey, SQLOrKey, Params}, AfterQuery, State) -> - on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, AfterQuery, State); +on_query(InstId, {TypeOrKey, SQLOrKey}, State) -> + on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State); +on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) -> + on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, State); on_query( InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, - AfterQuery, #{poolname := PoolName, prepare_statement := Prepares} = State ) -> LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, @@ -147,7 +146,6 @@ on_query( ), %% kill the poll worker to trigger reconnection _ = exit(Conn, restart), - emqx_resource:query_failed(AfterQuery), Result; {error, not_prepared} -> ?SLOG( @@ -157,13 +155,12 @@ on_query( case prepare_sql(Prepares, PoolName) of ok -> %% not return result, next loop will try again - on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, AfterQuery, State); + on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State); {error, Reason} -> ?SLOG( error, LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason} ), - emqx_resource:query_failed(AfterQuery), {error, Reason} end; {error, Reason} -> @@ -171,10 +168,8 @@ on_query( error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} ), - emqx_resource:query_failed(AfterQuery), Result; _ -> - emqx_resource:query_success(AfterQuery), Result end. diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 6f89e7ff1..d31c1316f 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -29,7 +29,7 @@ -export([ on_start/2, on_stop/2, - on_query/4, + on_query/3, on_get_status/2 ]). @@ -116,9 +116,9 @@ on_stop(InstId, #{poolname := PoolName}) -> }), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, {Type, NameOrSQL}, AfterQuery, #{poolname := _PoolName} = State) -> - on_query(InstId, {Type, NameOrSQL, []}, AfterQuery, State); -on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> +on_query(InstId, {Type, NameOrSQL}, #{poolname := _PoolName} = State) -> + on_query(InstId, {Type, NameOrSQL, []}, State); +on_query(InstId, {Type, NameOrSQL, Params}, #{poolname := PoolName} = State) -> ?SLOG(debug, #{ msg => "postgresql connector received sql query", connector => InstId, @@ -132,10 +132,9 @@ on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName} connector => InstId, sql => NameOrSQL, reason => Reason - }), - emqx_resource:query_failed(AfterQuery); + }); _ -> - emqx_resource:query_success(AfterQuery) + ok end, Result. diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 67310dbac..4826a170b 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -28,7 +28,7 @@ -export([ on_start/2, on_stop/2, - on_query/4, + on_query/3, on_get_status/2 ]). @@ -177,7 +177,7 @@ on_stop(InstId, #{poolname := PoolName, type := Type}) -> _ -> emqx_plugin_libs_pool:stop_pool(PoolName) end. -on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) -> +on_query(InstId, {cmd, Command}, #{poolname := PoolName, type := Type} = State) -> ?TRACE( "QUERY", "redis_connector_received", @@ -195,10 +195,9 @@ on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := T connector => InstId, sql => Command, reason => Reason - }), - emqx_resource:query_failed(AfterCommand); + }); _ -> - emqx_resource:query_success(AfterCommand) + ok end, Result. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index d6f959510..13ffff587 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -21,7 +21,7 @@ -type resource_config() :: term(). -type resource_spec() :: map(). -type resource_state() :: term(). --type resource_status() :: connected | disconnected | connecting. +-type resource_status() :: connected | disconnected | connecting | stopped. -type resource_data() :: #{ id := resource_id(), mod := module(), @@ -45,13 +45,11 @@ %% periodically. auto_retry_interval => integer() }. --type after_query() :: - {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} - | undefined. - -%% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback -%% actions upon query failure --type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}. +-type query_result() :: + ok + | {ok, term()} + | {error, term()} + | {resource_down, term()}. -define(TEST_ID_PREFIX, "_test_:"). -define(RES_METRICS, resource_metrics). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 793b9f446..081264315 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -23,13 +23,6 @@ -export([list_types/0]). -%% APIs for behaviour implementations - --export([ - query_success/1, - query_failed/1 -]). - %% APIs for instances -export([ @@ -113,7 +106,8 @@ -export([inc_metrics_funcs/1, inc_success/1, inc_failed/1]). -optional_callbacks([ - on_query/4, + on_query/3, + on_batch_query/3, on_get_status/2 ]). @@ -125,7 +119,9 @@ -callback on_stop(resource_id(), resource_state()) -> term(). %% when calling emqx_resource:query/3 --callback on_query(resource_id(), Request :: term(), after_query(), resource_state()) -> term(). +-callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result(). + +-callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result(). %% when calling emqx_resource:health_check/2 -callback on_get_status(resource_id(), resource_state()) -> @@ -149,22 +145,6 @@ is_resource_mod(Module) -> proplists:get_value(behaviour, Info, []), lists:member(?MODULE, Behaviour). --spec query_success(after_query()) -> ok. -query_success(undefined) -> ok; -query_success({OnSucc, _}) -> exec_query_after_calls(OnSucc). - --spec query_failed(after_query()) -> ok. -query_failed(undefined) -> ok; -query_failed({_, OnFailed}) -> exec_query_after_calls(OnFailed). - -exec_query_after_calls(Funcs) -> - lists:foreach( - fun({Fun, Arg}) -> - emqx_resource_utils:safe_exec(Fun, Arg) - end, - Funcs - ). - %% ================================================================================= %% APIs for resource instances %% ================================================================================= @@ -247,7 +227,7 @@ query(ResId, Request) -> emqx_resource_worker:query(ResId, Request). -spec query_async(resource_id(), Request :: term(), emqx_resource_worker:reply_fun()) -> - ok. + Result :: term(). query_async(ResId, Request, ReplyFun) -> emqx_resource_worker:query_async(ResId, Request, ReplyFun). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index b8a3812b5..b5bcbd330 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -53,7 +53,7 @@ -define(SHORT_HEALTHCHECK_INTERVAL, 1000). -define(HEALTHCHECK_INTERVAL, 15000). --define(ETS_TABLE, emqx_resource_manager). +-define(ETS_TABLE, ?MODULE). -define(WAIT_FOR_RESOURCE_DELAY, 100). -define(T_OPERATION, 5000). -define(T_LOOKUP, 1000). @@ -114,9 +114,9 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> [matched, success, failed, exception, resource_down], [matched] ), + ok = emqx_resource_worker_sup:start_workers(ResId, Opts), case maps:get(start_after_created, Opts, true) of true -> - ok = emqx_resource_sup:start_workers(ResId, Opts), wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)); false -> ok @@ -317,7 +317,7 @@ handle_event({call, From}, health_check, _State, Data) -> handle_manually_health_check(From, Data); % State: CONNECTING handle_event(enter, _OldState, connecting, Data) -> - UpdatedData = Data#data{status = connected}, + UpdatedData = Data#data{status = connecting}, insert_cache(Data#data.id, Data#data.group, Data), Actions = [{state_timeout, 0, health_check}], {keep_state, UpdatedData, Actions}; @@ -332,7 +332,7 @@ handle_event(enter, _OldState, connected, Data) -> UpdatedData = Data#data{status = connected}, insert_cache(Data#data.id, Data#data.group, UpdatedData), _ = emqx_alarm:deactivate(Data#data.id), - Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], + Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], {next_state, connected, UpdatedData, Actions}; handle_event(state_timeout, health_check, connected, Data) -> handle_connected_health_check(Data); @@ -423,7 +423,7 @@ handle_remove_event(From, ClearMetrics, Data) -> true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok end, - ok = emqx_resource_sup:stop_workers(Data#data.id, Data#data.opts), + ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts), {stop_and_reply, normal, [{reply, From, ok}]}. start_resource(Data, From) -> @@ -487,7 +487,7 @@ handle_connected_health_check(Data) -> Data, fun (connected, UpdatedData) -> - Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], + Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], {keep_state, UpdatedData, Actions}; (Status, UpdatedData) -> ?SLOG(error, #{ @@ -510,6 +510,9 @@ with_health_check(Data, Func) -> insert_cache(ResId, UpdatedData#data.group, UpdatedData), Func(Status, UpdatedData). +health_check_interval(Opts) -> + maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). + maybe_alarm(connected, _ResId) -> ok; maybe_alarm(_Status, <>) -> diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index 14db50d01..920743101 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -19,7 +19,7 @@ -behaviour(supervisor). --export([start_link/0, start_workers/2, stop_workers/2]). +-export([start_link/0]). -export([init/1]). @@ -29,7 +29,6 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, Metrics = emqx_metrics_worker:child_spec(?RES_METRICS), - ResourceManager = #{ id => emqx_resource_manager_sup, @@ -39,79 +38,11 @@ init([]) -> type => supervisor, modules => [emqx_resource_manager_sup] }, - {ok, {SupFlags, [Metrics, ResourceManager]}}. - -start_workers(ResId, Opts) -> - PoolSize = pool_size(Opts), - _ = ensure_worker_pool(ResId, hash, [{size, PoolSize}]), - lists:foreach( - fun(Idx) -> - _ = ensure_worker_added(ResId, {ResId, Idx}, Idx), - ok = ensure_worker_started(ResId, Idx, Opts) - end, - lists:seq(1, PoolSize) - ). - -stop_workers(ResId, Opts) -> - PoolSize = pool_size(Opts), - lists:foreach( - fun(Idx) -> - ok = ensure_worker_stopped(ResId, Idx), - ok = ensure_worker_removed(ResId, {ResId, Idx}) - end, - lists:seq(1, PoolSize) - ), - _ = gproc_pool:delete(ResId), - ok. - -pool_size(Opts) -> - maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)). - -ensure_worker_pool(Pool, Type, Opts) -> - try - gproc_pool:new(Pool, Type, Opts) - catch - error:exists -> ok - end, - ok. - -ensure_worker_added(Pool, Name, Slot) -> - try - gproc_pool:add_worker(Pool, Name, Slot) - catch - error:exists -> ok - end, - ok. - -ensure_worker_removed(Pool, Name) -> - _ = gproc_pool:remove_worker(Pool, Name), - ok. - --define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}). -ensure_worker_started(ResId, Idx, Opts) -> - Mod = emqx_resource_worker, - Spec = #{ - id => ?CHILD_ID(Mod, ResId, Idx), - start => {Mod, start_link, [ResId, Idx, Opts]}, - restart => transient, - shutdown => 5000, - type => worker, - modules => [Mod] + WorkerSup = #{ + id => emqx_resource_worker_sup, + start => {emqx_resource_worker_sup, start_link, []}, + restart => permanent, + shutdown => infinity, + type => supervisor }, - case supervisor:start_child(emqx_resource_sup, Spec) of - {ok, _Pid} -> ok; - {error, {already_started, _}} -> ok; - {error, already_present} -> ok; - {error, _} = Err -> Err - end. - -ensure_worker_stopped(ResId, Idx) -> - ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx), - case supervisor:terminate_child(emqx_resource_sup, ChildId) of - ok -> - supervisor:delete_child(emqx_resource_sup, ChildId); - {error, not_found} -> - ok; - {error, Reason} -> - {error, Reason} - end. + {ok, {SupFlags, [Metrics, ResourceManager, WorkerSup]}}. diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 7acf2d0f9..ae0d24313 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -44,6 +44,8 @@ -export([running/3, blocked/3]). +-export([queue_item_marshaller/1]). + -define(RESUME_INTERVAL, 15000). %% count @@ -51,11 +53,15 @@ %% milliseconds -define(DEFAULT_BATCH_TIME, 10). +-define(Q_ITEM(REQUEST), {q_item, REQUEST}). + -define(QUERY(FROM, REQUEST), {FROM, REQUEST}). -define(REPLY(FROM, REQUEST, RESULT), {FROM, REQUEST, RESULT}). -define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]). --define(RESOURCE_ERROR(Reason, Msg), {error, {resource_error, #{reason => Reason, msg => Msg}}}). +-define(RESOURCE_ERROR(Reason, Msg), + {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}} +). -define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}). -type id() :: binary(). @@ -72,21 +78,21 @@ callback_mode() -> [state_functions, state_enter]. start_link(Id, Index, Opts) -> gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []). --spec query(id(), request()) -> ok. +-spec query(id(), request()) -> Result :: term(). query(Id, Request) -> - gen_statem:call(pick(Id, self()), {query, Request}). + query(Id, self(), Request). --spec query(id(), term(), request()) -> ok. -query(Id, Key, Request) -> - gen_statem:call(pick(Id, Key), {query, Request}). +-spec query(id(), term(), request()) -> Result :: term(). +query(Id, PickKey, Request) -> + pick_query(call, Id, PickKey, {query, Request}). --spec query_async(id(), request(), reply_fun()) -> ok. +-spec query_async(id(), request(), reply_fun()) -> Result :: term(). query_async(Id, Request, ReplyFun) -> - gen_statem:cast(pick(Id, self()), {query, Request, ReplyFun}). + query_async(Id, self(), Request, ReplyFun). --spec query_async(id(), term(), request(), reply_fun()) -> ok. -query_async(Id, Key, Request, ReplyFun) -> - gen_statem:cast(pick(Id, Key), {query, Request, ReplyFun}). +-spec query_async(id(), term(), request(), reply_fun()) -> Result :: term(). +query_async(Id, PickKey, Request, ReplyFun) -> + pick_query(cast, Id, PickKey, {query, Request, ReplyFun}). -spec block(pid() | atom()) -> ok. block(ServerRef) -> @@ -97,17 +103,24 @@ resume(ServerRef) -> gen_statem:cast(ServerRef, resume). init({Id, Index, Opts}) -> + process_flag(trap_exit, true), true = gproc_pool:connect_worker(Id, {Id, Index}), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), Queue = case maps:get(queue_enabled, Opts, true) of - true -> replayq:open(#{dir => disk_queue_dir(Id, Index), seg_bytes => 10000000}); - false -> undefined + true -> + replayq:open(#{ + dir => disk_queue_dir(Id, Index), + seg_bytes => 10000000, + marshaller => fun ?MODULE:queue_item_marshaller/1 + }); + false -> + undefined end, St = #{ id => Id, index => Index, - batch_enabled => maps:get(batch_enabled, Opts, true), + batch_enabled => maps:get(batch_enabled, Opts, false), batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), queue => Queue, @@ -128,7 +141,7 @@ running(cast, {query, Request, ReplyFun}, St) -> running({call, From}, {query, Request}, St) -> query_or_acc(From, Request, St); running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> - {keep_state, flush(St#{tref := undefined})}; + flush(St#{tref := undefined}); running(info, {flush, _Ref}, _St) -> keep_state_and_data; running(info, Info, _St) -> @@ -154,12 +167,21 @@ terminate(_Reason, #{id := Id, index := Index}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +queue_item_marshaller(?Q_ITEM(_) = I) -> + term_to_binary(I); +queue_item_marshaller(Bin) when is_binary(Bin) -> + binary_to_term(Bin). + %%============================================================================== -pick(Id, Key) -> - Pid = gproc_pool:pick_worker(Id, Key), - case is_pid(Pid) of - true -> Pid; - false -> error({failed_to_pick_worker, {Id, Key}}) +pick_query(Fun, Id, Key, Query) -> + try gproc_pool:pick_worker(Id, Key) of + Pid when is_pid(Pid) -> + gen_statem:Fun(Pid, Query); + _ -> + ?RESOURCE_ERROR(not_created, "resource not found") + catch + error:badarg -> + ?RESOURCE_ERROR(not_created, "resource not found") end. do_resume(#{queue := undefined} = St) -> @@ -168,9 +190,9 @@ do_resume(#{queue := Q, id := Id} = St) -> case replayq:peek(Q) of empty -> {next_state, running, St}; - First -> + ?Q_ITEM(First) -> Result = call_query(Id, First), - case handle_query_result(Id, false, Result) of + case handle_query_result(Id, Result, false) of %% Send failed because resource down true -> {keep_state, St, {state_timeout, ?RESUME_INTERVAL, resume}}; @@ -182,6 +204,11 @@ do_resume(#{queue := Q, id := Id} = St) -> end end. +handle_blocked(From, Request, #{id := Id, queue := Q} = St) -> + Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), + _ = reply_caller(Id, ?REPLY(From, Request, Error), false), + {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Request)])}}. + drop_head(Q) -> {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), ok = replayq:ack(Q1, AckRef), @@ -196,26 +223,21 @@ acc_query(From, Request, #{acc := Acc, acc_left := Left} = St0) -> Acc1 = [?QUERY(From, Request) | Acc], St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of - true -> {keep_state, flush(St)}; + true -> flush(St); false -> {keep_state, ensure_flush_timer(St)} end. send_query(From, Request, #{id := Id, queue := Q} = St) -> Result = call_query(Id, Request), - case reply_caller(Id, Q, ?REPLY(From, Request, Result)) of + case reply_caller(Id, ?REPLY(From, Request, Result), false) of true -> - {keep_state, St#{queue := maybe_append_queue(Q, [Request])}}; + {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Request)])}}; false -> - {next_state, blocked, St} + {keep_state, St} end. -handle_blocked(From, Request, #{id := Id, queue := Q} = St) -> - Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), - _ = reply_caller(Id, Q, ?REPLY(From, Request, Error)), - {keep_state, St#{queue := maybe_append_queue(Q, [Request])}}. - flush(#{acc := []} = St) -> - St; + {keep_state, St}; flush( #{ id := Id, @@ -228,65 +250,65 @@ flush( St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, BatchResults) of true -> - Q1 = maybe_append_queue(Q0, [Request || ?QUERY(_, Request) <- Batch]), - {keep_state, St1#{queue := Q1}}; + Q1 = maybe_append_queue(Q0, [?Q_ITEM(Request) || ?QUERY(_, Request) <- Batch]), + {next_state, blocked, St1#{queue := Q1}}; false -> - {next_state, blocked, St1} + {keep_state, St1} end. -maybe_append_queue(undefined, _Query) -> undefined; -maybe_append_queue(Q, Query) -> replayq:append(Q, Query). +maybe_append_queue(undefined, _Request) -> undefined; +maybe_append_queue(Q, Request) -> replayq:append(Q, Request). batch_reply_caller(Id, BatchResults) -> lists:foldl( fun(Reply, BlockWorker) -> - reply_caller(Id, BlockWorker, Reply) + reply_caller(Id, Reply, BlockWorker) end, false, BatchResults ). -reply_caller(Id, BlockWorker, ?REPLY(undefined, _, Result)) -> - handle_query_result(Id, BlockWorker, Result); -reply_caller(Id, BlockWorker, ?REPLY({ReplyFun, Args}, _, Result)) -> +reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) -> + handle_query_result(Id, Result, BlockWorker); +reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) -> ?SAFE_CALL(ReplyFun(Result, Args)), - handle_query_result(Id, BlockWorker, Result); -reply_caller(Id, BlockWorker, ?REPLY(From, _, Result)) -> + handle_query_result(Id, Result, BlockWorker); +reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) -> gen_statem:reply(From, Result), - handle_query_result(Id, BlockWorker, Result). + handle_query_result(Id, Result, BlockWorker). -handle_query_result(Id, BlockWorker, ok) -> +handle_query_result(Id, ok, BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, success), BlockWorker; -handle_query_result(Id, BlockWorker, {ok, _}) -> +handle_query_result(Id, {ok, _}, BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, success), BlockWorker; -handle_query_result(Id, BlockWorker, ?RESOURCE_ERROR_M(exception, _)) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, exception), BlockWorker; -handle_query_result(_Id, _, ?RESOURCE_ERROR_M(NotWorking, _)) when +handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when NotWorking == not_connected; NotWorking == blocked -> true; -handle_query_result(_Id, BlockWorker, ?RESOURCE_ERROR_M(_, _)) -> +handle_query_result(_Id, ?RESOURCE_ERROR_M(_, _), BlockWorker) -> BlockWorker; -handle_query_result(Id, BlockWorker, {error, _}) -> +handle_query_result(Id, {error, _}, BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, failed), BlockWorker; -handle_query_result(Id, _BlockWorker, {resource_down, _}) -> +handle_query_result(Id, {resource_down, _}, _BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down), true. call_query(Id, Request) -> - do_call_query(on_query, Id, Request). + do_call_query(on_query, Id, Request, 1). call_batch_query(Id, Batch) -> - do_call_query(on_batch_query, Id, Batch). + do_call_query(on_batch_query, Id, Batch, length(Batch)). -do_call_query(Fun, Id, Data) -> +do_call_query(Fun, Id, Data, Count) -> case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Data)), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, Count), try Mod:Fun(Id, Data, ResourceState) of %% if the callback module (connector) wants to return an error that %% makes the current resource goes into the `error` state, it should diff --git a/apps/emqx_resource/src/emqx_resource_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_worker_sup.erl new file mode 100644 index 000000000..a2b3a1ba5 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_worker_sup.erl @@ -0,0 +1,136 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_resource_worker_sup). +-behaviour(supervisor). + +%%%============================================================================= +%%% Exports and Definitions +%%%============================================================================= + +%% External API +-export([start_link/0]). + +-export([start_workers/2, stop_workers/2]). + +%% Callbacks +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%%============================================================================= +%%% API +%%%============================================================================= + +-spec start_link() -> supervisor:startlink_ret(). +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%%============================================================================= +%%% Callbacks +%%%============================================================================= + +-spec init(list()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}} | ignore. +init([]) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 100, + period => 30 + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +start_workers(ResId, Opts) -> + PoolSize = pool_size(Opts), + _ = ensure_worker_pool(ResId, hash, [{size, PoolSize}]), + lists:foreach( + fun(Idx) -> + _ = ensure_worker_added(ResId, Idx), + ok = ensure_worker_started(ResId, Idx, Opts) + end, + lists:seq(1, PoolSize) + ). + +stop_workers(ResId, Opts) -> + PoolSize = pool_size(Opts), + lists:foreach( + fun(Idx) -> + ensure_worker_removed(ResId, Idx) + end, + lists:seq(1, PoolSize) + ), + ensure_worker_pool_removed(ResId), + ok. + +%%%============================================================================= +%%% Internal +%%%============================================================================= +pool_size(Opts) -> + maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)). + +ensure_worker_pool(ResId, Type, Opts) -> + try + gproc_pool:new(ResId, Type, Opts) + catch + error:exists -> ok + end, + ok. + +ensure_worker_added(ResId, Idx) -> + try + gproc_pool:add_worker(ResId, {ResId, Idx}, Idx) + catch + error:exists -> ok + end, + ok. + +-define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}). +ensure_worker_started(ResId, Idx, Opts) -> + Mod = emqx_resource_worker, + Spec = #{ + id => ?CHILD_ID(Mod, ResId, Idx), + start => {Mod, start_link, [ResId, Idx, Opts]}, + restart => transient, + shutdown => 5000, + type => worker, + modules => [Mod] + }, + case supervisor:start_child(emqx_resource_sup, Spec) of + {ok, _Pid} -> ok; + {error, {already_started, _}} -> ok; + {error, already_present} -> ok; + {error, _} = Err -> Err + end. + +ensure_worker_removed(ResId, Idx) -> + ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx), + case supervisor:terminate_child(emqx_resource_sup, ChildId) of + ok -> + Res = supervisor:delete_child(emqx_resource_sup, ChildId), + _ = gproc_pool:remove_worker(ResId, {ResId, Idx}), + Res; + {error, not_found} -> + ok; + {error, Reason} -> + {error, Reason} + end. + +ensure_worker_pool_removed(ResId) -> + try + gproc_pool:delete(ResId) + catch + error:badarg -> ok + end, + ok. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 51e6bac43..915c59611 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -26,6 +26,7 @@ -define(TEST_RESOURCE, emqx_test_resource). -define(ID, <<"id">>). -define(DEFAULT_RESOURCE_GROUP, <<"default">>). +-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). all() -> emqx_common_test_helpers:all(?MODULE). @@ -80,7 +81,7 @@ t_create_remove(_) -> #{name => test_resource}, #{} ), - #{pid := Pid} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid)), @@ -110,7 +111,7 @@ t_create_remove_local(_) -> #{name => test_resource}, #{} ), - #{pid := Pid} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid)), @@ -127,7 +128,7 @@ t_create_remove_local(_) -> {error, _} = emqx_resource:remove_local(?ID), ?assertMatch( - {error, {emqx_resource, #{reason := not_found}}}, + ?RESOURCE_ERROR(not_created), emqx_resource:query(?ID, get_state) ), ?assertNot(is_process_alive(Pid)). @@ -143,23 +144,23 @@ t_do_not_start_after_created(_) -> %% the resource should remain `disconnected` after created timer:sleep(200), ?assertMatch( - {error, {emqx_resource, #{reason := not_connected}}}, + ?RESOURCE_ERROR(stopped), emqx_resource:query(?ID, get_state) ), ?assertMatch( - {ok, _, #{status := disconnected}}, + {ok, _, #{status := stopped}}, emqx_resource:get_instance(?ID) ), %% start the resource manually.. ok = emqx_resource:start(?ID), - #{pid := Pid} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid)), %% restart the resource ok = emqx_resource:restart(?ID), ?assertNot(is_process_alive(Pid)), - #{pid := Pid2} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid2)), ok = emqx_resource:remove_local(?ID), @@ -174,23 +175,10 @@ t_query(_) -> #{name => test_resource} ), - Pid = self(), - Success = fun() -> Pid ! success end, - Failure = fun() -> Pid ! failure end, - - #{pid := _} = emqx_resource:query(?ID, get_state), - #{pid := _} = emqx_resource:query(?ID, get_state, {[{Success, []}], [{Failure, []}]}), - #{pid := _} = emqx_resource:query(?ID, get_state, undefined), - #{pid := _} = emqx_resource:query(?ID, get_state_failed, undefined), - - receive - Message -> ?assertEqual(success, Message) - after 100 -> - ?assert(false) - end, + {ok, #{pid := _}} = emqx_resource:query(?ID, get_state), ?assertMatch( - {error, {emqx_resource, #{reason := not_found}}}, + ?RESOURCE_ERROR(not_created), emqx_resource:query(<<"unknown">>, get_state) ), @@ -201,11 +189,14 @@ t_healthy_timeout(_) -> ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, - #{name => <<"test_resource">>}, - #{health_check_timeout => 200} + #{name => <<"bad_not_atom_name">>, register => true}, + %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later. + #{health_check_interval => 200} + ), + ?assertMatch( + ?RESOURCE_ERROR(not_connected), + emqx_resource:query(?ID, get_state) ), - timer:sleep(500), - ok = emqx_resource:remove_local(?ID). t_healthy(_) -> @@ -213,11 +204,9 @@ t_healthy(_) -> ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, - #{name => <<"test_resource">>} + #{name => test_resource} ), - timer:sleep(400), - - #{pid := Pid} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), timer:sleep(300), emqx_resource:set_resource_status_connecting(?ID), @@ -229,10 +218,10 @@ t_healthy(_) -> erlang:exit(Pid, shutdown), - ?assertEqual({ok, connecting}, emqx_resource:health_check(?ID)), + ?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)), ?assertMatch( - [#{status := connecting}], + [#{status := disconnected}], emqx_resource:list_instances_verbose() ), @@ -260,7 +249,7 @@ t_stop_start(_) -> #{} ), - #{pid := Pid0} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid0)), @@ -269,14 +258,14 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), ?assertMatch( - {error, {emqx_resource, #{reason := not_connected}}}, + ?RESOURCE_ERROR(stopped), emqx_resource:query(?ID, get_state) ), ok = emqx_resource:restart(?ID), timer:sleep(300), - #{pid := Pid1} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid1)). @@ -302,7 +291,7 @@ t_stop_start_local(_) -> #{} ), - #{pid := Pid0} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid0)), @@ -311,13 +300,13 @@ t_stop_start_local(_) -> ?assertNot(is_process_alive(Pid0)), ?assertMatch( - {error, {emqx_resource, #{reason := not_connected}}}, + ?RESOURCE_ERROR(stopped), emqx_resource:query(?ID, get_state) ), ok = emqx_resource:restart(?ID), - #{pid := Pid1} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid1)). @@ -368,17 +357,17 @@ create_dry_run_local_succ() -> ?assertEqual(undefined, whereis(test_resource)). t_create_dry_run_local_failed(_) -> - {Res1, _} = emqx_resource:create_dry_run_local( + Res1 = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, - #{cteate_error => true} + #{create_error => true} ), - ?assertEqual(error, Res1), + ?assertMatch({error, _}, Res1), - {Res2, _} = emqx_resource:create_dry_run_local( + Res2 = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{name => test_resource, health_check_error => true} ), - ?assertEqual(error, Res2), + ?assertMatch({error, _}, Res2), Res3 = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, @@ -400,7 +389,7 @@ t_reset_metrics(_) -> #{name => test_resource} ), - #{pid := Pid} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), emqx_resource:reset_metrics(?ID), ?assert(is_process_alive(Pid)), ok = emqx_resource:remove(?ID), diff --git a/apps/emqx_resource/test/emqx_test_resource.erl b/apps/emqx_resource/test/emqx_test_resource.erl index c23f87d50..569579d27 100644 --- a/apps/emqx_resource/test/emqx_test_resource.erl +++ b/apps/emqx_resource/test/emqx_test_resource.erl @@ -24,7 +24,7 @@ -export([ on_start/2, on_stop/2, - on_query/4, + on_query/3, on_get_status/2 ]). @@ -50,24 +50,20 @@ on_start(_InstId, #{create_error := true}) -> error("some error"); on_start(InstId, #{name := Name, stop_error := true} = Opts) -> Register = maps:get(register, Opts, false), - {ok, #{ - name => Name, + {ok, Opts#{ id => InstId, stop_error => true, pid => spawn_dummy_process(Name, Register) }}; -on_start(InstId, #{name := Name, health_check_error := true} = Opts) -> +on_start(InstId, #{name := Name} = Opts) -> Register = maps:get(register, Opts, false), - {ok, #{ - name => Name, + {ok, Opts#{ id => InstId, - health_check_error => true, pid => spawn_dummy_process(Name, Register) }}; on_start(InstId, #{name := Name} = Opts) -> Register = maps:get(register, Opts, false), - {ok, #{ - name => Name, + {ok, Opts#{ id => InstId, pid => spawn_dummy_process(Name, Register) }}. @@ -78,12 +74,10 @@ on_stop(_InstId, #{pid := Pid}) -> erlang:exit(Pid, shutdown), ok. -on_query(_InstId, get_state, AfterQuery, State) -> - emqx_resource:query_success(AfterQuery), - State; -on_query(_InstId, get_state_failed, AfterQuery, State) -> - emqx_resource:query_failed(AfterQuery), - State. +on_query(_InstId, get_state, State) -> + {ok, State}; +on_query(_InstId, get_state_failed, State) -> + {error, State}. on_get_status(_InstId, #{health_check_error := true}) -> disconnected; @@ -91,10 +85,11 @@ on_get_status(_InstId, #{pid := Pid}) -> timer:sleep(300), case is_process_alive(Pid) of true -> connected; - false -> connecting + false -> disconnected end. spawn_dummy_process(Name, Register) -> + ct:pal("---- Register Name: ~p", [Name]), spawn( fun() -> true =