diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 802427134..8b44846cf 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -398,9 +398,8 @@ on_sql_query( ?TRACE("QUERY", "mysql_connector_received", LogMeta), Worker = ecpool:get_client(PoolName), {ok, Conn} = ecpool_worker:client(Worker), - Result = erlang:apply(mysql, SQLFunc, [Conn, SQLOrKey, Data, Timeout]), - case Result of - {error, disconnected} -> + try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of + {error, disconnected} = Result -> ?SLOG( error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected} @@ -421,12 +420,19 @@ on_sql_query( LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} ), {error, {recoverable_error, Reason}}; - {error, Reason} -> + {error, Reason} = Result -> ?SLOG( error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} ), Result; - _ -> + Result -> Result + catch + error:badarg -> + ?SLOG( + error, + LogMeta#{msg => "mysql_connector_invalid_params", params => Data} + ), + {error, {invalid_params, Data}} end. diff --git a/apps/emqx_resource/include/emqx_resource_errors.hrl b/apps/emqx_resource/include/emqx_resource_errors.hrl index b11ee3c1a..6d1b3e92f 100644 --- a/apps/emqx_resource/include/emqx_resource_errors.hrl +++ b/apps/emqx_resource/include/emqx_resource_errors.hrl @@ -15,6 +15,6 @@ %%-------------------------------------------------------------------- -define(RESOURCE_ERROR(Reason, Msg), - {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}} + {error, {resource_error, #{reason => Reason, msg => Msg}}} ). -define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index c5841b7ef..a451939b6 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -374,18 +374,21 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) -> gen_statem:reply(From, Result), handle_query_result(Id, Result, BlockWorker). -handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), BlockWorker) -> + ?SLOG(error, #{msg => resource_exception, info => Msg}), emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), BlockWorker; handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when NotWorking == not_connected; NotWorking == blocked -> true; -handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, _), BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), BlockWorker) -> + ?SLOG(error, #{msg => resource_not_found, info => Msg}), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'), BlockWorker; -handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, _), BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), BlockWorker) -> + ?SLOG(error, #{msg => resource_stopped, info => Msg}), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'), BlockWorker; @@ -394,18 +397,21 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), BlockWorker; -handle_query_result(Id, {error, {recoverable_error, _}}, _BlockWorker) -> +handle_query_result(Id, {error, {recoverable_error, Reason}}, _BlockWorker) -> %% the message will be queued in replayq or inflight window, %% i.e. the counter 'queuing' will increase, so we pretend that we have not %% sent this message. + ?SLOG(warning, #{msg => recoverable_error, reason => Reason}), emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1), true; -handle_query_result(Id, {error, _}, BlockWorker) -> +handle_query_result(Id, {error, Reason}, BlockWorker) -> + ?SLOG(error, #{msg => send_error, reason => Reason}), emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), BlockWorker; handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> true; -handle_query_result(Id, {async_return, {error, _}}, BlockWorker) -> +handle_query_result(Id, {async_return, {error, Msg}}, BlockWorker) -> + ?SLOG(error, #{msg => async_send_error, info => Msg}), emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), BlockWorker; handle_query_result(_Id, {async_return, ok}, BlockWorker) -> @@ -433,7 +439,7 @@ call_query(QM0, Id, Query, QueryOpts) -> ?RESOURCE_ERROR(not_found, "resource not found") end. --define(APPLY_RESOURCE(EXPR, REQ), +-define(APPLY_RESOURCE(NAME, EXPR, REQ), try %% if the callback module (connector) wants to return an error that %% makes the current resource goes into the `blocked` state, it should @@ -441,12 +447,13 @@ call_query(QM0, Id, Query, QueryOpts) -> EXPR catch ERR:REASON:STACKTRACE -> - MSG = io_lib:format( - "call query failed, func: ~s, id: ~s, error: ~0p, Request: ~0p", - [??EXPR, Id, {ERR, REASON, STACKTRACE}, REQ], - [{chars_limit, 1024}] - ), - ?RESOURCE_ERROR(exception, MSG) + ?RESOURCE_ERROR(exception, #{ + name => NAME, + id => Id, + request => REQ, + error => {ERR, REASON}, + stacktrace => STACKTRACE + }) end ). @@ -454,7 +461,7 @@ apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) - ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), - Result = ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request), + Result = ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), Result; apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> @@ -462,6 +469,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> Name = maps:get(inflight_name, QueryOpts, undefined), WinSize = maps:get(inflight_window, QueryOpts, undefined), ?APPLY_RESOURCE( + call_query_async, case inflight_is_full(Name, WinSize) of true -> ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}), @@ -484,7 +492,7 @@ apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> BatchLen = length(Batch), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen), - Result = ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch), + Result = ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen), Result; apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> @@ -492,6 +500,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> Name = maps:get(inflight_name, QueryOpts, undefined), WinSize = maps:get(inflight_window, QueryOpts, undefined), ?APPLY_RESOURCE( + call_batch_query_async, case inflight_is_full(Name, WinSize) of true -> ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),