From 88ca25c60cbdb0bc7b3d384b7ae5576973278c2e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 31 May 2022 19:54:44 +0800 Subject: [PATCH] fix(resource): fast return when starting a unavailable resource --- apps/emqx/include/http_api.hrl | 2 + apps/emqx_bridge/src/emqx_bridge_api.erl | 27 ++- apps/emqx_bridge/src/emqx_bridge_resource.erl | 7 +- .../test/emqx_bridge_api_SUITE.erl | 5 +- .../src/mqtt/emqx_connector_mqtt_worker.erl | 1 + apps/emqx_resource/src/emqx_resource.erl | 4 +- .../src/emqx_resource_manager.erl | 184 +++++++++++------- .../test/emqx_resource_SUITE.erl | 9 +- 8 files changed, 153 insertions(+), 86 deletions(-) diff --git a/apps/emqx/include/http_api.hrl b/apps/emqx/include/http_api.hrl index 08c17f274..7cd16d338 100644 --- a/apps/emqx/include/http_api.hrl +++ b/apps/emqx/include/http_api.hrl @@ -49,6 +49,7 @@ %% Internal error -define(INTERNAL_ERROR, 'INTERNAL_ERROR'). +-define(SERVICE_UNAVAILABLE, 'SERVICE_UNAVAILABLE'). -define(SOURCE_ERROR, 'SOURCE_ERROR'). -define(UPDATE_FAILED, 'UPDATE_FAILED'). -define(REST_FAILED, 'REST_FAILED'). @@ -81,6 +82,7 @@ {'TOPIC_NOT_FOUND', <<"Topic not found">>}, {'USER_NOT_FOUND', <<"User not found">>}, {'INTERNAL_ERROR', <<"Server inter error">>}, + {'SERVICE_UNAVAILABLE', <<"Service unavailable">>}, {'SOURCE_ERROR', <<"Source error">>}, {'UPDATE_FAILED', <<"Update failed">>}, {'REST_FAILED', <<"Reset source or config failed">>}, diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 8cc45c911..bdbbb787f 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -321,7 +321,8 @@ schema("/bridges/:id") -> parameters => [param_path_id()], responses => #{ 204 => <<"Bridge deleted">>, - 400 => error_schema(['INVALID_ID'], "Update bridge failed") + 400 => error_schema(['INVALID_ID'], "Update bridge failed"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } } }; @@ -352,6 +353,7 @@ schema("/bridges/:id/operation/:operation") -> ], responses => #{ 200 => <<"Operation success">>, + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable"), 400 => error_schema('INVALID_ID', "Bad bridge ID") } } @@ -371,7 +373,8 @@ schema("/nodes/:node/bridges/:id/operation/:operation") -> responses => #{ 200 => <<"Operation success">>, 400 => error_schema('INVALID_ID', "Bad bridge ID"), - 403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation") + 403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } } }. @@ -417,6 +420,7 @@ schema("/nodes/:node/bridges/:id/operation/:operation") -> Id, case emqx_bridge:remove(BridgeType, BridgeName) of {ok, _} -> {204}; + {error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)} end ). @@ -466,6 +470,10 @@ lookup_from_local_node(BridgeType, BridgeName) -> {200}; {error, {pre_config_update, _, bridge_not_found}} -> {404, error_msg('NOT_FOUND', <<"bridge not found">>)}; + {error, {_, _, timeout}} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, timeout} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)} end; @@ -489,11 +497,18 @@ lookup_from_local_node(BridgeType, BridgeName) -> ConfMap = emqx:get_config([bridges, BridgeType, BridgeName]), case maps:get(enable, ConfMap, false) of false -> - {403, error_msg('FORBIDDEN_REQUEST', <<"forbidden operation">>)}; + {403, + error_msg( + 'FORBIDDEN_REQUEST', <<"forbidden operation: bridge disabled">> + )}; true -> case emqx_bridge_proto_v1:OperFunc(TargetNode, BridgeType, BridgeName) of - ok -> {200}; - {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)} + ok -> + {200}; + {error, timeout} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} end end end @@ -518,6 +533,8 @@ operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of {ok, _} -> {200}; + {error, [timeout | _]} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, ErrL} -> {500, error_msg('INTERNAL_ERROR', ErrL)} end. diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 5af164b33..0ff04c4f5 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -133,8 +133,11 @@ update(Type, Name, {OldConf, Conf}) -> %% we don't need to recreate the bridge if this config change is only to %% toggole the config 'bridge.{type}.{name}.enable' case maps:get(enable, Conf, true) of - true -> restart(Type, Name); - false -> stop(Type, Name) + true -> + _ = restart(Type, Name), + ok; + false -> + stop(Type, Name) end end. diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index c5d8d924c..c048a13fe 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -378,7 +378,10 @@ t_enable_disable_bridges(_) -> {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>), {ok, 403, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>), - ?assertEqual(<<"{\"code\":\"FORBIDDEN_REQUEST\",\"message\":\"forbidden operation\"}">>, Res), + ?assertEqual( + <<"{\"code\":\"FORBIDDEN_REQUEST\",\"message\":\"forbidden operation: bridge disabled\"}">>, + Res + ), %% enable a stopped bridge {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>), diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 1ff4e7f6a..db795a4cf 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -573,6 +573,7 @@ obfuscate(Map) -> ). is_sensitive(password) -> true; +is_sensitive(ssl_opts) -> true; is_sensitive(_) -> false. str(A) when is_atom(A) -> diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index f5b03e76b..5e3242135 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -260,8 +260,10 @@ query(InstId, Request, AfterQuery) -> emqx_metrics_worker:inc(resource_metrics, InstId, exception), erlang:raise(Err, Reason, ST) end; + {ok, _Group, _Data} -> + query_error(not_found, <<"resource not connected">>); {error, not_found} -> - query_error(not_found, <<"resource not found or not connected">>) + query_error(not_found, <<"resource not found">>) end. -spec restart(instance_id()) -> ok | {error, Reason :: term()}. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index e0d1e4e70..8d0d984a2 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -23,19 +23,24 @@ % API -export([ ensure_resource/5, - create_dry_run/2, - ets_lookup/1, - get_metrics/1, - health_check/1, - list_all/0, - list_group/1, - lookup/1, + create/5, recreate/4, remove/1, - reset_metrics/1, + create_dry_run/2, restart/2, - set_resource_status_connecting/1, - stop/1 + start/2, + stop/1, + health_check/1 +]). + +-export([ + lookup/1, + list_all/0, + list_group/1, + ets_lookup/1, + get_metrics/1, + reset_metrics/1, + set_resource_status_connecting/1 ]). % Server @@ -51,6 +56,8 @@ -define(HEALTHCHECK_INTERVAL, 15000). -define(ETS_TABLE, emqx_resource_manager). -define(WAIT_FOR_RESOURCE_DELAY, 100). +-define(T_OPERATION, 5000). +-define(T_LOOKUP, 1000). -define(IS_STATUS(ST), ST =:= connecting; ST =:= connected; ST =:= disconnected). @@ -74,11 +81,24 @@ ensure_resource(InstId, Group, ResourceType, Config, Opts) -> {ok, _Group, Data} -> {ok, Data}; {error, not_found} -> - do_start(InstId, Group, ResourceType, Config, Opts), + create(InstId, Group, ResourceType, Config, Opts), {ok, _Group, Data} = lookup(InstId), {ok, Data} end. +%% @doc Create a resource_manager and wait until it is running +create(InstId, Group, ResourceType, Config, Opts) -> + % The state machine will make the actual call to the callback/resource module after init + ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts), + ok = emqx_metrics_worker:create_metrics( + resource_metrics, + InstId, + [matched, success, failed, exception], + [matched] + ), + wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), + ok. + %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. %% %% Triggers the `emqx_resource_manager_sup` supervisor to actually create @@ -90,9 +110,9 @@ create_dry_run(ResourceType, Config) -> ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}), case wait_for_resource_ready(InstId, 5000) of ok -> - _ = stop(InstId); + _ = remove(InstId); timeout -> - _ = stop(InstId), + _ = remove(InstId), {error, timeout} end. @@ -118,27 +138,36 @@ remove(InstId) when is_binary(InstId) -> %% @doc Stops a running resource_manager and optionally clears the metrics for the resource -spec remove(instance_id(), boolean()) -> ok | {error, Reason :: term()}. remove(InstId, ClearMetrics) when is_binary(InstId) -> - safe_call(InstId, {remove, ClearMetrics}). + safe_call(InstId, {remove, ClearMetrics}, ?T_OPERATION). %% @doc Stops and then starts an instance that was already running -spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. restart(InstId, Opts) when is_binary(InstId) -> - case lookup(InstId) of - {ok, Group, #{mod := ResourceType, config := Config} = _Data} -> - _ = remove(InstId, false), - do_start(InstId, Group, ResourceType, Config, Opts); - Error -> + case safe_call(InstId, restart, ?T_OPERATION) of + ok -> + wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), + ok; + {error, _Reason} = Error -> Error end. -%% @doc Stop the resource manager process +%% @doc Stop the resource +-spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. +start(InstId, Opts) -> + case safe_call(InstId, start, ?T_OPERATION) of + ok -> + wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), + ok; + {error, _Reason} = Error -> + Error + end. + +%% @doc Start the resource -spec stop(instance_id()) -> ok | {error, Reason :: term()}. stop(InstId) -> - case safe_call(InstId, stop) of + case safe_call(InstId, stop, ?T_OPERATION) of ok -> ok; - {error, not_found} -> - ok; {error, _Reason} = Error -> Error end. @@ -146,12 +175,15 @@ stop(InstId) -> %% @doc Test helper -spec set_resource_status_connecting(instance_id()) -> ok. set_resource_status_connecting(InstId) -> - safe_call(InstId, set_resource_status_connecting). + safe_call(InstId, set_resource_status_connecting, infinity). %% @doc Lookup the group and data of a resource -spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. lookup(InstId) -> - safe_call(InstId, lookup). + case safe_call(InstId, lookup, ?T_LOOKUP) of + {error, timeout} -> ets_lookup(InstId); + Result -> Result + end. %% @doc Lookup the group and data of a resource -spec ets_lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. @@ -192,7 +224,7 @@ list_group(Group) -> -spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}. health_check(InstId) -> - safe_call(InstId, health_check). + safe_call(InstId, health_check, ?T_OPERATION). %% Server start/stop callbacks @@ -204,13 +236,16 @@ start_link(InstId, Group, ResourceType, Config, Opts) -> mod = ResourceType, config = Config, opts = Opts, - status = undefined, + status = connecting, state = undefined, error = undefined }, gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []). init(Data) -> + process_flag(trap_exit, true), + %% init the cache so that lookup/1 will always return something + ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), {ok, connecting, Data, {next_event, internal, try_connect}}. terminate(_Reason, _State, Data) -> @@ -227,11 +262,20 @@ callback_mode() -> [handle_event_function, state_enter]. % Called during testing to force a specific state handle_event({call, From}, set_resource_status_connecting, _State, Data) -> {next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]}; +% Called when the resource is to be restarted +handle_event({call, From}, restart, _State, Data) -> + _ = stop_resource(Data), + start_resource(Data, From); +% Called when the resource is to be started +handle_event({call, From}, start, _State, #data{status = disconnected} = Data) -> + start_resource(Data, From); +handle_event({call, From}, start, _State, _Data) -> + {keep_state_and_data, [{reply, From, ok}]}; % Called when the resource is to be stopped handle_event({call, From}, stop, _State, #data{status = disconnected} = Data) -> {next_state, stopped, Data, [{reply, From, ok}]}; handle_event({call, From}, stop, _State, Data) -> - Result = do_stop(Data), + Result = stop_resource(Data), UpdatedData = Data#data{status = disconnected}, {next_state, stopped, UpdatedData, [{reply, From, Result}]}; % Called when a resource is to be stopped and removed. @@ -243,9 +287,9 @@ handle_event({call, From}, lookup, _State, #data{group = Group} = Data) -> {keep_state_and_data, [{reply, From, Reply}]}; % Connecting state enter handle_event(internal, try_connect, connecting, Data) -> - handle_connection_attempt(Data); + start_resource(Data, undefined); handle_event(enter, _OldState, connecting, Data) -> - ets:delete(?ETS_TABLE, Data#data.id), + ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), Actions = [{state_timeout, 0, health_check}], {next_state, connecting, Data, Actions}; % Connecting state health_check timeouts. @@ -261,15 +305,16 @@ handle_event(enter, _OldState, connected, Data) -> handle_event(state_timeout, health_check, connected, Data) -> perform_connected_health_check(Data); handle_event(enter, _OldState, disconnected, Data) -> + ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), handle_disconnected_state_enter(Data); handle_event(state_timeout, auto_retry, disconnected, Data) -> - handle_connection_attempt(Data); + start_resource(Data, undefined); handle_event(enter, _OldState, stopped, Data) -> UpdatedData = Data#data{status = disconnected}, - ets:delete(?ETS_TABLE, Data#data.id), + ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}), {next_state, stopped, UpdatedData}; % Resource has been explicitly stopped, so return that as the error reason. -handle_event({call, From}, _, stopped, _Data) -> +handle_event({call, From}, health_check, stopped, _Data) -> Actions = [{reply, From, {error, resource_is_stopped}}], {keep_state_and_data, Actions}; handle_event({call, From}, health_check, _State, Data) -> @@ -293,56 +338,43 @@ handle_event(EventType, EventData, State, Data) -> %%------------------------------------------------------------------------------ handle_disconnected_state_enter(Data) -> - UpdatedData = Data#data{status = disconnected}, - ets:delete(?ETS_TABLE, Data#data.id), case maps:get(auto_retry_interval, Data#data.opts, undefined) of undefined -> - {next_state, disconnected, UpdatedData}; + {next_state, disconnected, Data}; RetryInterval -> Actions = [{state_timeout, RetryInterval, auto_retry}], - {next_state, disconnected, UpdatedData, Actions} - end. - -handle_connection_attempt(Data) -> - case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of - {ok, ResourceState} -> - UpdatedData = Data#data{state = ResourceState, status = connecting}, - %% Perform an initial health_check immediately before transitioning into a connected state - Actions = [{state_timeout, 0, health_check}], - {next_state, connecting, UpdatedData, Actions}; - {error, Reason} -> - %% Keep track of the error reason why the connection did not work - %% so that the Reason can be returned when the verification call is made. - UpdatedData = Data#data{status = disconnected, error = Reason}, - {next_state, disconnected, UpdatedData} + {next_state, disconnected, Data, Actions} end. handle_remove_event(From, ClearMetrics, Data) -> - do_stop(Data), - ets:delete(?ETS_TABLE, Data#data.id), + stop_resource(Data), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, Data#data.id); false -> ok end, {stop_and_reply, normal, [{reply, From, ok}]}. -do_start(InstId, Group, ResourceType, Config, Opts) -> - % The state machine will make the actual call to the callback/resource module after init - ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts), - ok = emqx_metrics_worker:create_metrics( - resource_metrics, - InstId, - [matched, success, failed, exception], - [matched] - ), - wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), - ok. +start_resource(Data, From) -> + %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache + ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), + case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of + {ok, ResourceState} -> + UpdatedData = Data#data{state = ResourceState, status = connecting}, + %% Perform an initial health_check immediately before transitioning into a connected state + Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), + {next_state, connecting, UpdatedData, Actions}; + {error, Reason} = Err -> + %% Keep track of the error reason why the connection did not work + %% so that the Reason can be returned when the verification call is made. + UpdatedData = Data#data{status = disconnected, error = Reason}, + Actions = maybe_reply([], From, Err), + {next_state, disconnected, UpdatedData, Actions} + end. -do_stop(#data{state = undefined} = _Data) -> +stop_resource(#data{state = undefined} = _Data) -> ok; -do_stop(Data) -> +stop_resource(Data) -> Result = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state), - ets:delete(?ETS_TABLE, Data#data.id), _ = maybe_clear_alarm(Data#data.id), Result. @@ -388,10 +420,11 @@ with_health_check(Data, Func) -> ResId = Data#data.id, HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state), {Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state), - _ = maybe_alarm_resource_down(connected, ResId), + _ = maybe_alarm_resource_down(Status, ResId), UpdatedData = Data#data{ state = NewState, status = Status, error = Err }, + ets:insert(?ETS_TABLE, {ResId, UpdatedData#data.group, UpdatedData}), Func(Status, UpdatedData). maybe_alarm_resource_down(connected, _ResId) -> @@ -417,6 +450,11 @@ parse_health_check_result({Status, NewState}, _OldState) when ?IS_STATUS(Status) parse_health_check_result({Status, NewState, Error}, _OldState) when ?IS_STATUS(Status) -> {Status, NewState, Error}. +maybe_reply(Actions, undefined, _Reply) -> + Actions; +maybe_reply(Actions, From, Reply) -> + [{reply, From, Reply} | Actions]. + data_record_to_external_map_with_metrics(Data) -> #{ id => Data#data.id, @@ -446,10 +484,12 @@ do_wait_for_resource_ready(InstId, Retry) -> do_wait_for_resource_ready(InstId, Retry - 1) end. -safe_call(InstId, Message) -> +safe_call(InstId, Message, Timeout) -> try - gen_statem:call(proc_name(InstId), Message) + gen_statem:call(proc_name(InstId), Message, {clean_timeout, Timeout}) catch - exit:_ -> - {error, not_found} + exit:{R, _} when R == noproc; R == normal; R == shutdown -> + {error, not_found}; + exit:{timeout, _} -> + {error, timeout} end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 77d0e57c3..2519bc7d3 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include("emqx_resource.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). -define(TEST_RESOURCE, emqx_test_resource). -define(ID, <<"id">>). @@ -183,7 +184,6 @@ t_healthy(_) -> emqx_resource:set_resource_status_connecting(?ID), {ok, connected} = emqx_resource:health_check(?ID), - ?assertMatch( [#{status := connected}], emqx_resource:list_instances_verbose() @@ -194,7 +194,7 @@ t_healthy(_) -> ?assertEqual({ok, connecting}, emqx_resource:health_check(?ID)), ?assertMatch( - [], + [#{status := connecting}], emqx_resource:list_instances_verbose() ), @@ -236,7 +236,6 @@ t_stop_start(_) -> ), ok = emqx_resource:restart(?ID), - timer:sleep(300), #{pid := Pid1} = emqx_resource:query(?ID, get_state), @@ -334,11 +333,11 @@ t_create_dry_run_local_failed(_) -> ), ?assertEqual(error, Res2), - {Res3, _} = emqx_resource:create_dry_run_local( + Res3 = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{name => test_resource, stop_error => true} ), - ?assertEqual(error, Res3). + ?assertEqual(ok, Res3). t_test_func(_) -> ?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),