From 1625b8eaebc5a0ca33bbd5a4c97bf9726ab91af3 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 26 Aug 2022 15:50:09 +0800 Subject: [PATCH] fix(mysql_bridge): export the query_mode option to the APIs --- .../src/emqx_connector_mysql.erl | 7 ++ .../src/emqx_resource_manager.erl | 15 +++- .../src/emqx_resource_worker.erl | 75 ++++++++++++------- 3 files changed, 68 insertions(+), 29 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index fdab716cb..b9c200316 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -414,6 +414,13 @@ on_sql_query( LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared} ), Error; + {error, {1053, <<"08S01">>, Reason}} -> + %% mysql sql server shutdown in progress + ?SLOG( + error, + LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} + ), + {recoverable_error, Reason}; {error, Reason} -> ?SLOG( error, diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index f90d042b0..db4b294a8 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -128,7 +128,20 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> ok = emqx_metrics_worker:create_metrics( ?RES_METRICS, ResId, - [matched, success, failed, exception], + [ + matched, + sent, + dropped, + queued, + batched, + inflight, + 'sent.success', + 'sent.failed', + 'sent.exception', + 'dropped.inflight', + 'dropped.queued', + 'dropped.other' + ], [matched] ), ok = emqx_resource_worker_sup:start_workers(ResId, Opts), diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 8034bcb9e..8b8b1467c 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -77,23 +77,27 @@ start_link(Id, Index, Opts) -> sync_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), Timeout = maps:get(timeout, Opts, infinity), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), pick_cast(Id, PickKey, {query, Request, Opts}). %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> Result = call_query(sync, Id, ?QUERY(self(), Request), #{}), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), _ = handle_query_result(Id, Result, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> Result = call_query(async, Id, ?QUERY(ReplyFun, Request), #{}), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), _ = handle_query_result(Id, Result, false), Result. @@ -149,8 +153,10 @@ running(cast, resume, _St) -> keep_state_and_data; running(cast, block, St) -> {next_state, block, St}; -running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) -> - Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]), +running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when + is_list(Batch) +-> + Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), {next_state, block, St#{queue := Q1}}; running({call, From}, {query, Request, _Opts}, St) -> query_or_acc(From, Request, St); @@ -169,8 +175,10 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) -> {keep_state_and_data, {state_timeout, ResumeT, resume}}; blocked(cast, block, _St) -> keep_state_and_data; -blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) -> - Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]), +blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when + is_list(Batch) +-> + Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), {keep_state, St#{queue := Q1}}; blocked(cast, resume, St) -> do_resume(St); @@ -179,12 +187,12 @@ blocked(state_timeout, resume, St) -> blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) -> Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), _ = reply_caller(Id, ?REPLY(From, Request, Error)), - {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}}; + {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request))])}}; blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) -> ReplayFun = maps:get(async_reply_fun, Opts, undefined), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), _ = reply_caller(Id, ?REPLY(ReplayFun, Request, Error)), - {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}. + {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}. terminate(_Reason, #{id := Id, index := Index}) -> gproc_pool:disconnect_worker(Id, {Id, Index}). @@ -206,10 +214,10 @@ estimate_size(QItem) -> Pid when is_pid(Pid) -> EXPR; _ -> - ?RESOURCE_ERROR(not_created, "resource not created") + ?RESOURCE_ERROR(worker_not_created, "resource not created") catch error:badarg -> - ?RESOURCE_ERROR(not_created, "resource not created"); + ?RESOURCE_ERROR(worker_not_created, "resource not created"); exit:{timeout, _} -> ?RESOURCE_ERROR(timeout, "call resource timeout") end @@ -277,18 +285,15 @@ query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) inflight_name => maps:get(name, St), inflight_window => maps:get(async_inflight_window, St) }, - case send_query(From, Request, Id, QueryOpts) of + Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts), + case reply_caller(Id, ?REPLY(From, Request, Result)) of true -> Query = ?QUERY(From, Request), - {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Query)])}}; + {next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}}; false -> {keep_state, St} end. -send_query(From, Request, Id, QueryOpts) -> - Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts), - reply_caller(Id, ?REPLY(From, Request, Result)). - flush(#{acc := []} = St) -> {keep_state, St}; flush( @@ -307,14 +312,18 @@ flush( St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of true -> - Q1 = maybe_append_queue(Q0, [?Q_ITEM(Query) || Query <- Batch]), + Q1 = maybe_append_queue(Id, Q0, [?Q_ITEM(Query) || Query <- Batch]), {next_state, blocked, St1#{queue := Q1}}; false -> {keep_state, St1} end. -maybe_append_queue(undefined, _Items) -> undefined; -maybe_append_queue(Q, Items) -> replayq:append(Q, Items). +maybe_append_queue(Id, undefined, _Items) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'), + undefined; +maybe_append_queue(Id, Q, Items) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), + replayq:append(Q, Items). batch_reply_caller(Id, BatchResult, Batch) -> lists:foldl( @@ -344,30 +353,40 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) -> handle_query_result(Id, Result, BlockWorker). handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, exception), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.exception'), BlockWorker; handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when NotWorking == not_connected; NotWorking == blocked -> true; -handle_query_result(_Id, ?RESOURCE_ERROR_M(_, _), BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, _), BlockWorker) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'), + BlockWorker; +handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, _), BlockWorker) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'), + BlockWorker; +handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> + ?SLOG(error, #{msg => other_resource_error, reason => Reason}), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), BlockWorker; handle_query_result(Id, {error, _}, BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, failed), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), BlockWorker; -handle_query_result(Id, {recoverable_error, _}, _BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, failed), +handle_query_result(_Id, {recoverable_error, _}, _BlockWorker) -> true; handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> true; handle_query_result(Id, {async_return, {error, _}}, BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, failed), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), BlockWorker; handle_query_result(_Id, {async_return, ok}, BlockWorker) -> BlockWorker; handle_query_result(Id, Result, BlockWorker) -> assert_ok_result(Result), - emqx_metrics_worker:inc(?RES_METRICS, Id, success), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.success'), BlockWorker. call_query(QM0, Id, Query, QueryOpts) -> @@ -407,7 +426,7 @@ call_query(QM0, Id, Query, QueryOpts) -> apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent), ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), @@ -419,7 +438,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent), ReplyFun = fun ?MODULE:reply_after_query/6, Ref = make_message_ref(), Args = [self(), Id, Name, Ref, Query], @@ -432,7 +451,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request) <- Batch], - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent, length(Batch)), ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch); apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), @@ -444,7 +463,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent, length(Batch)), ReplyFun = fun ?MODULE:batch_reply_after_query/6, Ref = make_message_ref(), Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},