From 6b0ccfbc43196a04ec8a7551f1656d4ce300827b Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 24 Aug 2022 22:18:00 +0800 Subject: [PATCH 01/13] refactor: rename the error return resource_down -> recoverable_error --- apps/emqx_resource/include/emqx_resource.hrl | 2 +- .../src/emqx_resource_manager.erl | 2 +- .../src/emqx_resource_worker.erl | 13 ++++++----- .../test/emqx_connector_demo.erl | 2 +- .../test/emqx_resource_SUITE.erl | 2 +- .../src/emqx_rule_runtime.erl | 22 ++++++++++++++----- .../src/emqx_ee_connector_influxdb.erl | 17 +++++--------- 7 files changed, 33 insertions(+), 27 deletions(-) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index fa3f0a0f6..4d1c45eb4 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -77,7 +77,7 @@ ok | {ok, term()} | {error, term()} - | {resource_down, term()}. + | {recoverable_error, term()}. -define(WORKER_POOL_SIZE, 16). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 82db194dc..f90d042b0 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -128,7 +128,7 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> ok = emqx_metrics_worker:create_metrics( ?RES_METRICS, ResId, - [matched, success, failed, exception, resource_down], + [matched, success, failed, exception], [matched] ), ok = emqx_resource_worker_sup:start_workers(ResId, Opts), diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 9c632b662..8034bcb9e 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -355,13 +355,14 @@ handle_query_result(_Id, ?RESOURCE_ERROR_M(_, _), BlockWorker) -> handle_query_result(Id, {error, _}, BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, failed), BlockWorker; -handle_query_result(Id, {resource_down, _}, _BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down), +handle_query_result(Id, {recoverable_error, _}, _BlockWorker) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, failed), true; handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> true; -handle_query_result(_Id, {async_return, {resource_down, _}}, _BlockWorker) -> - true; +handle_query_result(Id, {async_return, {error, _}}, BlockWorker) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, failed), + BlockWorker; handle_query_result(_Id, {async_return, ok}, BlockWorker) -> BlockWorker; handle_query_result(Id, Result, BlockWorker) -> @@ -390,8 +391,8 @@ call_query(QM0, Id, Query, QueryOpts) -> -define(APPLY_RESOURCE(EXPR, REQ), try %% if the callback module (connector) wants to return an error that - %% makes the current resource goes into the `error` state, it should - %% return `{resource_down, Reason}` + %% makes the current resource goes into the `blocked` state, it should + %% return `{recoverable_error, Reason}` EXPR catch ERR:REASON:STACKTRACE -> diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 6e7bca18a..4999b9410 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -96,7 +96,7 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> Pid ! {From, {inc, N}}, receive {ReqRef, ok} -> ok; - {ReqRef, incorrect_status} -> {resource_down, incorrect_status} + {ReqRef, incorrect_status} -> {recoverable_error, incorrect_status} after 1000 -> {error, timeout} end; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 06160f6c7..0bfa67d07 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -419,7 +419,7 @@ t_query_counter_async_inflight(_) -> {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), ct:pal("metrics: ~p", [C]), ?assertMatch( - #{matched := M, success := S, exception := E, failed := F, resource_down := RD} when + #{matched := M, success := S, exception := E, failed := F, recoverable_error := RD} when M >= Sent andalso M == S + E + F + RD, C ), diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index d7fa65829..aafce137c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -506,12 +506,22 @@ nested_put(Alias, Val, Columns0) -> -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found). inc_action_metrics(ok, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'); -inc_action_metrics({ok, _}, RuleId) -> - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'); -inc_action_metrics({resource_down, _}, RuleId) -> +inc_action_metrics({recoverable_error, _}, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); -inc_action_metrics(_, RuleId) -> - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'). +inc_action_metrics(R, RuleId) -> + case is_ok_result(R) of + false -> + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); + true -> + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success') + end. + +is_ok_result(ok) -> + true; +is_ok_result(R) when is_tuple(R) -> + ok = erlang:element(1, R); +is_ok_result(ok) -> + false. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index b83aec4bd..2024ba8ce 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -85,18 +85,13 @@ on_batch_query_async( InstId, BatchData, {ReplayFun, Args}, - State = #{write_syntax := SyntaxLines, client := Client} + #{write_syntax := SyntaxLines, client := Client} ) -> - case on_get_status(InstId, State) of - connected -> - case parse_batch_data(InstId, BatchData, SyntaxLines) of - {ok, Points} -> - do_async_query(InstId, Client, Points, {ReplayFun, Args}); - {error, Reason} -> - {error, Reason} - end; - disconnected -> - {resource_down, disconnected} + case parse_batch_data(InstId, BatchData, SyntaxLines) of + {ok, Points} -> + do_async_query(InstId, Client, Points, {ReplayFun, Args}); + {error, Reason} -> + {error, Reason} end. on_get_status(_InstId, #{client := Client}) -> From 1625b8eaebc5a0ca33bbd5a4c97bf9726ab91af3 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 26 Aug 2022 15:50:09 +0800 Subject: [PATCH 02/13] fix(mysql_bridge): export the query_mode option to the APIs --- .../src/emqx_connector_mysql.erl | 7 ++ .../src/emqx_resource_manager.erl | 15 +++- .../src/emqx_resource_worker.erl | 75 ++++++++++++------- 3 files changed, 68 insertions(+), 29 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index fdab716cb..b9c200316 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -414,6 +414,13 @@ on_sql_query( LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared} ), Error; + {error, {1053, <<"08S01">>, Reason}} -> + %% mysql sql server shutdown in progress + ?SLOG( + error, + LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} + ), + {recoverable_error, Reason}; {error, Reason} -> ?SLOG( error, diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index f90d042b0..db4b294a8 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -128,7 +128,20 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> ok = emqx_metrics_worker:create_metrics( ?RES_METRICS, ResId, - [matched, success, failed, exception], + [ + matched, + sent, + dropped, + queued, + batched, + inflight, + 'sent.success', + 'sent.failed', + 'sent.exception', + 'dropped.inflight', + 'dropped.queued', + 'dropped.other' + ], [matched] ), ok = emqx_resource_worker_sup:start_workers(ResId, Opts), diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 8034bcb9e..8b8b1467c 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -77,23 +77,27 @@ start_link(Id, Index, Opts) -> sync_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), Timeout = maps:get(timeout, Opts, infinity), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), pick_cast(Id, PickKey, {query, Request, Opts}). %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> Result = call_query(sync, Id, ?QUERY(self(), Request), #{}), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), _ = handle_query_result(Id, Result, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> Result = call_query(async, Id, ?QUERY(ReplyFun, Request), #{}), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), _ = handle_query_result(Id, Result, false), Result. @@ -149,8 +153,10 @@ running(cast, resume, _St) -> keep_state_and_data; running(cast, block, St) -> {next_state, block, St}; -running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) -> - Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]), +running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when + is_list(Batch) +-> + Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), {next_state, block, St#{queue := Q1}}; running({call, From}, {query, Request, _Opts}, St) -> query_or_acc(From, Request, St); @@ -169,8 +175,10 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) -> {keep_state_and_data, {state_timeout, ResumeT, resume}}; blocked(cast, block, _St) -> keep_state_and_data; -blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) -> - Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]), +blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when + is_list(Batch) +-> + Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]), {keep_state, St#{queue := Q1}}; blocked(cast, resume, St) -> do_resume(St); @@ -179,12 +187,12 @@ blocked(state_timeout, resume, St) -> blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) -> Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), _ = reply_caller(Id, ?REPLY(From, Request, Error)), - {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}}; + {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request))])}}; blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) -> ReplayFun = maps:get(async_reply_fun, Opts, undefined), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), _ = reply_caller(Id, ?REPLY(ReplayFun, Request, Error)), - {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}. + {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}. terminate(_Reason, #{id := Id, index := Index}) -> gproc_pool:disconnect_worker(Id, {Id, Index}). @@ -206,10 +214,10 @@ estimate_size(QItem) -> Pid when is_pid(Pid) -> EXPR; _ -> - ?RESOURCE_ERROR(not_created, "resource not created") + ?RESOURCE_ERROR(worker_not_created, "resource not created") catch error:badarg -> - ?RESOURCE_ERROR(not_created, "resource not created"); + ?RESOURCE_ERROR(worker_not_created, "resource not created"); exit:{timeout, _} -> ?RESOURCE_ERROR(timeout, "call resource timeout") end @@ -277,18 +285,15 @@ query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) inflight_name => maps:get(name, St), inflight_window => maps:get(async_inflight_window, St) }, - case send_query(From, Request, Id, QueryOpts) of + Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts), + case reply_caller(Id, ?REPLY(From, Request, Result)) of true -> Query = ?QUERY(From, Request), - {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Query)])}}; + {next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}}; false -> {keep_state, St} end. -send_query(From, Request, Id, QueryOpts) -> - Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts), - reply_caller(Id, ?REPLY(From, Request, Result)). - flush(#{acc := []} = St) -> {keep_state, St}; flush( @@ -307,14 +312,18 @@ flush( St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of true -> - Q1 = maybe_append_queue(Q0, [?Q_ITEM(Query) || Query <- Batch]), + Q1 = maybe_append_queue(Id, Q0, [?Q_ITEM(Query) || Query <- Batch]), {next_state, blocked, St1#{queue := Q1}}; false -> {keep_state, St1} end. -maybe_append_queue(undefined, _Items) -> undefined; -maybe_append_queue(Q, Items) -> replayq:append(Q, Items). +maybe_append_queue(Id, undefined, _Items) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'), + undefined; +maybe_append_queue(Id, Q, Items) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), + replayq:append(Q, Items). batch_reply_caller(Id, BatchResult, Batch) -> lists:foldl( @@ -344,30 +353,40 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) -> handle_query_result(Id, Result, BlockWorker). handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, exception), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.exception'), BlockWorker; handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when NotWorking == not_connected; NotWorking == blocked -> true; -handle_query_result(_Id, ?RESOURCE_ERROR_M(_, _), BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, _), BlockWorker) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'), + BlockWorker; +handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, _), BlockWorker) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'), + BlockWorker; +handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> + ?SLOG(error, #{msg => other_resource_error, reason => Reason}), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), BlockWorker; handle_query_result(Id, {error, _}, BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, failed), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), BlockWorker; -handle_query_result(Id, {recoverable_error, _}, _BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, failed), +handle_query_result(_Id, {recoverable_error, _}, _BlockWorker) -> true; handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> true; handle_query_result(Id, {async_return, {error, _}}, BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, failed), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), BlockWorker; handle_query_result(_Id, {async_return, ok}, BlockWorker) -> BlockWorker; handle_query_result(Id, Result, BlockWorker) -> assert_ok_result(Result), - emqx_metrics_worker:inc(?RES_METRICS, Id, success), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.success'), BlockWorker. call_query(QM0, Id, Query, QueryOpts) -> @@ -407,7 +426,7 @@ call_query(QM0, Id, Query, QueryOpts) -> apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent), ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), @@ -419,7 +438,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent), ReplyFun = fun ?MODULE:reply_after_query/6, Ref = make_message_ref(), Args = [self(), Id, Name, Ref, Query], @@ -432,7 +451,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request) <- Batch], - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent, length(Batch)), ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch); apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), @@ -444,7 +463,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent, length(Batch)), ReplyFun = fun ?MODULE:batch_reply_after_query/6, Ref = make_message_ref(), Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, From 6fde37791c3df2dbef49c054323d637140a1e506 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 30 Aug 2022 10:14:10 +0800 Subject: [PATCH 03/13] refactor: new metrics for resources --- .../src/emqx_resource_manager.erl | 19 +++--- .../src/emqx_resource_worker.erl | 61 +++++++++++++------ 2 files changed, 54 insertions(+), 26 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index db4b294a8..3360c3c5d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -129,17 +129,20 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> ?RES_METRICS, ResId, [ - matched, - sent, - dropped, - queued, - batched, - inflight, + 'matched', + 'sent', + 'dropped', + 'queued', + 'batched', + 'retried', 'sent.success', 'sent.failed', 'sent.exception', - 'dropped.inflight', - 'dropped.queued', + 'sent.inflight', + 'dropped.queue_not_enabled', + 'dropped.queue_full', + 'dropped.resource_not_found', + 'dropped.resource_stopped', 'dropped.other' ], [matched] diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 8b8b1467c..dabaf037c 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -77,27 +77,27 @@ start_link(Id, Index, Opts) -> sync_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), Timeout = maps:get(timeout, Opts, infinity), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), pick_cast(Id, PickKey, {query, Request, Opts}). %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> Result = call_query(sync, Id, ?QUERY(self(), Request), #{}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), _ = handle_query_result(Id, Result, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> Result = call_query(async, Id, ?QUERY(ReplyFun, Request), #{}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), _ = handle_query_result(Id, Result, false), Result. @@ -252,6 +252,7 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S case handle_query_result(Id, Result, false) of %% Send failed because resource down true -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), {keep_state, St0, {state_timeout, ResumeT, resume}}; %% Send ok or failed but the resource is working false -> @@ -263,18 +264,20 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S inflight_drop(Name, Ref), St0; _ -> - St0#{queue => drop_head(Q)} + St0#{queue => drop_head(Id, Q)} end, {keep_state, St, {state_timeout, 0, resume}} end. -drop_head(Q) -> +drop_head(Id, Q) -> {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), ok = replayq:ack(Q1, AckRef), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -1), Q1. -query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left} = St0) -> +query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> Acc1 = [?QUERY(From, Request) | Acc], + emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched'), St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); @@ -308,6 +311,7 @@ flush( inflight_name => maps:get(name, St), inflight_window => maps:get(async_inflight_window, St) }, + emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched', -length(Batch)), Result = call_query(configured, Id, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of @@ -322,8 +326,20 @@ maybe_append_queue(Id, undefined, _Items) -> emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'), undefined; maybe_append_queue(Id, Q, Items) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), - replayq:append(Q, Items). + case replayq:overflow(Q) of + Overflow when Overflow =< 0 -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), + replayq:append(Q, Items); + Overflow -> + PopOpts = #{bytes_limit => Overflow, count_limit => 999999999}, + {Q1, QAckRef, Items} = replayq:pop(Q, PopOpts), + ok = replayq:ack(Q1, QAckRef), + Dropped = length(Items), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), + ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), + Q1 + end. batch_reply_caller(Id, BatchResult, Batch) -> lists:foldl( @@ -375,7 +391,8 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> handle_query_result(Id, {error, _}, BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), BlockWorker; -handle_query_result(_Id, {recoverable_error, _}, _BlockWorker) -> +handle_query_result(Id, {recoverable_error, _}, _BlockWorker) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1), true; handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> true; @@ -426,7 +443,7 @@ call_query(QM0, Id, Query, QueryOpts) -> apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), @@ -438,7 +455,8 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), ReplyFun = fun ?MODULE:reply_after_query/6, Ref = make_message_ref(), Args = [self(), Id, Name, Ref, Query], @@ -451,7 +469,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request) <- Batch], - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent, length(Batch)), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch); apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), @@ -463,7 +481,8 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent, length(Batch)), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), ReplyFun = fun ?MODULE:batch_reply_after_query/6, Ref = make_message_ref(), Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, @@ -477,14 +496,20 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) -> case reply_caller(Id, ?REPLY(From, Request, Result)) of - true -> ?MODULE:block(Pid); - false -> inflight_drop(Name, Ref) + true -> + ?MODULE:block(Pid); + false -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), + inflight_drop(Name, Ref) end. batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> case batch_reply_caller(Id, Result, Batch) of - true -> ?MODULE:block(Pid); - false -> inflight_drop(Name, Ref) + true -> + ?MODULE:block(Pid); + false -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -length(Batch)), + inflight_drop(Name, Ref) end. %%============================================================================== %% the inflight queue for async query From b5ad5233a189a572a8cd98a570a73a6faac0fe15 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 30 Aug 2022 10:14:56 +0800 Subject: [PATCH 04/13] fix(mqtt-bridge): username and password defaults to undefined --- apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 51144748b..f4725020e 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -101,7 +101,7 @@ fields("server_configs") -> mk( binary(), #{ - default => "emqx", + default => undefined, desc => ?DESC("username") } )}, @@ -109,7 +109,7 @@ fields("server_configs") -> mk( binary(), #{ - default => "emqx", + default => undefined, format => <<"password">>, desc => ?DESC("password") } From 262e68f7d24cd7b9335d190ebcf5d42c7dfb2a66 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 30 Aug 2022 10:16:02 +0800 Subject: [PATCH 05/13] fix: return error when receive HTTP code other than 2xx --- .../src/emqx_connector_http.erl | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 5658b385d..f63873fe3 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -275,7 +275,7 @@ on_query( ), NRequest = formalize_request(Method, BasePath, Request), case - Result = ehttpc:request( + ehttpc:request( case KeyOrNum of undefined -> PoolName; _ -> {PoolName, KeyOrNum} @@ -286,33 +286,35 @@ on_query( Retry ) of - {error, Reason} -> + {error, Reason} = Result -> ?SLOG(error, #{ msg => "http_connector_do_request_failed", request => NRequest, reason => Reason, connector => InstId - }); - {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> - ok; - {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> - ok; - {ok, StatusCode, _} -> + }), + Result; + {ok, StatusCode, _} = Result when StatusCode >= 200 andalso StatusCode < 300 -> + Result; + {ok, StatusCode, _, _} = Result when StatusCode >= 200 andalso StatusCode < 300 -> + Result; + {ok, StatusCode, Headers} -> ?SLOG(error, #{ msg => "http connector do request, received error response", request => NRequest, connector => InstId, status_code => StatusCode - }); - {ok, StatusCode, _, _} -> + }), + {error, #{status_code => StatusCode, headers => Headers}}; + {ok, StatusCode, Headers, Body} -> ?SLOG(error, #{ msg => "http connector do request, received error response", request => NRequest, connector => InstId, status_code => StatusCode - }) - end, - Result. + }), + {error, #{status_code => StatusCode, headers => Headers, body => Body}} + end. on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> case maps:get(request, State, undefined) of From e0a6a61d739c1bec961cc4c76d3f77ceb1052960 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 30 Aug 2022 10:19:40 +0800 Subject: [PATCH 06/13] fix: return error on start hstreamdb crash --- .../emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl index 3892b7fc0..e4bbe8425 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl @@ -135,13 +135,15 @@ start_client(InstId, Config) -> do_start_client(InstId, Config) catch E:R:S -> - ?SLOG(error, #{ + Error = #{ msg => "start hstreamdb connector error", connector => InstId, error => E, reason => R, stack => S - }) + }, + ?SLOG(error, Error), + {error, Error} end. do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> From 65dfa63324cb0b34ad7d6b3ae2394c4755f9d015 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 30 Aug 2022 12:28:01 +0800 Subject: [PATCH 07/13] fix: update the counters for data bridges --- apps/emqx_bridge/src/emqx_bridge_api.erl | 195 +++++++++++++++--- .../src/emqx_connector_http.erl | 7 + 2 files changed, 172 insertions(+), 30 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 36f8cf0f5..2f15bbcd7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -57,22 +57,95 @@ end ). --define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{ - matched => MATCH, - success => SUCC, - failed => FAILED, - rate => RATE, - rate_last5m => RATE_5, - rate_max => RATE_MAX -}). --define(metrics(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{ - matched := MATCH, - success := SUCC, - failed := FAILED, - rate := RATE, - rate_last5m := RATE_5, - rate_max := RATE_MAX -}). +-define(EMPTY_METRICS, + ?METRICS( + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 + ) +). + +-define(METRICS( + Batched, + Dropped, + DroppedOther, + DroppedQueueFull, + DroppedQueueNotEnabled, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Retried, + Sent, + SentExcpt, + SentFailed, + SentInflight, + SentSucc, + RATE, + RATE_5, + RATE_MAX +), + #{ + 'batched' => Batched, + 'dropped' => Dropped, + 'dropped.other' => DroppedOther, + 'dropped.queue_full' => DroppedQueueFull, + 'dropped.queue_not_enabled' => DroppedQueueNotEnabled, + 'dropped.resource_not_found' => DroppedResourceNotFound, + 'dropped.resource_stopped' => DroppedResourceStopped, + 'matched' => Matched, + 'queued' => Queued, + 'retried' => Retried, + 'sent' => Sent, + 'sent.exception' => SentExcpt, + 'sent.failed' => SentFailed, + 'sent.inflight' => SentInflight, + 'sent.success' => SentSucc, + rate => RATE, + rate_last5m => RATE_5, + rate_max => RATE_MAX + } +). + +-define(metrics( + Batched, + Dropped, + DroppedOther, + DroppedQueueFull, + DroppedQueueNotEnabled, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Retried, + Sent, + SentExcpt, + SentFailed, + SentInflight, + SentSucc, + RATE, + RATE_5, + RATE_MAX +), + #{ + 'batched' := Batched, + 'dropped' := Dropped, + 'dropped.other' := DroppedOther, + 'dropped.queue_full' := DroppedQueueFull, + 'dropped.queue_not_enabled' := DroppedQueueNotEnabled, + 'dropped.resource_not_found' := DroppedResourceNotFound, + 'dropped.resource_stopped' := DroppedResourceStopped, + 'matched' := Matched, + 'queued' := Queued, + 'retried' := Retried, + 'sent' := Sent, + 'sent.exception' := SentExcpt, + 'sent.failed' := SentFailed, + 'sent.inflight' := SentInflight, + 'sent.success' := SentSucc, + rate := RATE, + rate_last5m := RATE_5, + rate_max := RATE_MAX + } +). namespace() -> "bridge". @@ -193,11 +266,11 @@ method_example(_Type, put) -> maybe_with_metrics_example(TypeNameExam, get) -> TypeNameExam#{ - metrics => ?METRICS(0, 0, 0, 0, 0, 0), + metrics => ?EMPTY_METRICS, node_metrics => [ #{ node => node(), - metrics => ?METRICS(0, 0, 0, 0, 0, 0) + metrics => ?EMPTY_METRICS } ] }; @@ -217,7 +290,16 @@ info_example_basic(webhook) -> ssl => #{enable => false}, local_topic => <<"emqx_webhook/#">>, method => post, - body => <<"${payload}">> + body => <<"${payload}">>, + resource_opts => #{ + worker_pool_size => 1, + health_check_interval => 15000, + auto_restart_interval => 15000, + query_mode => sync, + async_inflight_window => 100, + enable_queue => true, + max_queue_bytes => 1024 * 1024 * 1024 + } }; info_example_basic(mqtt) -> (mqtt_main_example())#{ @@ -619,19 +701,37 @@ collect_metrics(Bridges) -> [maps:with([node, metrics], B) || B <- Bridges]. aggregate_metrics(AllMetrics) -> - InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0), + InitMetrics = ?EMPTY_METRICS, lists:foldl( fun( - #{metrics := ?metrics(Match1, Succ1, Failed1, Rate1, Rate5m1, RateMax1)}, - ?metrics(Match0, Succ0, Failed0, Rate0, Rate5m0, RateMax0) + #{ + metrics := ?metrics( + M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17, M18 + ) + }, + ?metrics( + N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17, N18 + ) ) -> ?METRICS( - Match1 + Match0, - Succ1 + Succ0, - Failed1 + Failed0, - Rate1 + Rate0, - Rate5m1 + Rate5m0, - RateMax1 + RateMax0 + M1 + N1, + M2 + N2, + M3 + N3, + M4 + N4, + M5 + N5, + M6 + N6, + M7 + N7, + M8 + N8, + M9 + N9, + M10 + N10, + M11 + N11, + M12 + N12, + M13 + N13, + M14 + N14, + M15 + N15, + M16 + N16, + M17 + N17, + M18 + N18 ) end, InitMetrics, @@ -660,12 +760,47 @@ format_resp( }. format_metrics(#{ - counters := #{failed := Failed, exception := Ex, matched := Match, success := Succ}, + counters := #{ + 'batched' := Batched, + 'dropped' := Dropped, + 'dropped.other' := DroppedOther, + 'dropped.queue_full' := DroppedQueueFull, + 'dropped.queue_not_enabled' := DroppedQueueNotEnabled, + 'dropped.resource_not_found' := DroppedResourceNotFound, + 'dropped.resource_stopped' := DroppedResourceStopped, + 'matched' := Matched, + 'queued' := Queued, + 'retried' := Retried, + 'sent' := Sent, + 'sent.exception' := SentExcpt, + 'sent.failed' := SentFailed, + 'sent.inflight' := SentInflight, + 'sent.success' := SentSucc + }, rate := #{ matched := #{current := Rate, last5m := Rate5m, max := RateMax} } }) -> - ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax). + ?METRICS( + Batched, + Dropped, + DroppedOther, + DroppedQueueFull, + DroppedQueueNotEnabled, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Retried, + Sent, + SentExcpt, + SentFailed, + SentInflight, + SentSucc, + Rate, + Rate5m, + RateMax + ). fill_defaults(Type, RawConf) -> PackedConf = pack_bridge_conf(Type, RawConf), diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index f63873fe3..2562bf272 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -286,6 +286,13 @@ on_query( Retry ) of + {error, econnrefused} -> + ?SLOG(warning, #{ + msg => "http_connector_do_request_failed", + reason => econnrefused, + connector => InstId + }), + {recoverable_error, econnrefused}; {error, Reason} = Result -> ?SLOG(error, #{ msg => "http_connector_do_request_failed", From c4106c0d7751cecf9dc5765131682fd348a0da34 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 30 Aug 2022 12:28:43 +0800 Subject: [PATCH 08/13] fix: resume the resource worker on health check success --- apps/emqx_resource/src/emqx_resource_manager.erl | 7 +++++++ apps/emqx_resource/src/emqx_resource_worker_sup.erl | 6 +++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 3360c3c5d..261863d4c 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -555,6 +555,7 @@ with_health_check(Data, Func) -> HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state), {Status, NewState, Err} = parse_health_check_result(HCRes, Data), _ = maybe_alarm(Status, ResId), + ok = maybe_resume_resource_workers(Status), UpdatedData = Data#data{ state = NewState, status = Status, error = Err }, @@ -575,6 +576,12 @@ maybe_alarm(_Status, ResId) -> <<"resource down: ", ResId/binary>> ). +maybe_resume_resource_workers(connected) -> + {_, Pid, _, _} = supervisor:which_children(emqx_resource_worker_sup), + emqx_resource_worker:resume(Pid); +maybe_resume_resource_workers(_) -> + ok. + maybe_clear_alarm(<>) -> ok; maybe_clear_alarm(ResId) -> diff --git a/apps/emqx_resource/src/emqx_resource_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_worker_sup.erl index 5305eddaf..2db7b5c4c 100644 --- a/apps/emqx_resource/src/emqx_resource_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_worker_sup.erl @@ -107,7 +107,7 @@ ensure_worker_started(ResId, Idx, Opts) -> type => worker, modules => [Mod] }, - case supervisor:start_child(emqx_resource_sup, Spec) of + case supervisor:start_child(?SERVER, Spec) of {ok, _Pid} -> ok; {error, {already_started, _}} -> ok; {error, already_present} -> ok; @@ -116,9 +116,9 @@ ensure_worker_started(ResId, Idx, Opts) -> ensure_worker_removed(ResId, Idx) -> ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx), - case supervisor:terminate_child(emqx_resource_sup, ChildId) of + case supervisor:terminate_child(?SERVER, ChildId) of ok -> - Res = supervisor:delete_child(emqx_resource_sup, ChildId), + Res = supervisor:delete_child(?SERVER, ChildId), _ = gproc_pool:remove_worker(ResId, {ResId, Idx}), Res; {error, not_found} -> From 9e50866cd0ab7d5d5682c4b06cf7df97dbc10117 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 30 Aug 2022 17:13:49 +0800 Subject: [PATCH 09/13] fix: rename queue_max_bytes -> max_queue_bytes --- apps/emqx/src/emqx_metrics_worker.erl | 2 +- .../i18n/emqx_resource_schema_i18n.conf | 2 +- apps/emqx_resource/include/emqx_resource.hrl | 9 +++-- .../src/emqx_resource_manager.erl | 8 +++-- .../src/emqx_resource_worker.erl | 36 +++++++++++-------- .../src/schema/emqx_resource_schema.erl | 12 +++---- .../src/emqx_rule_runtime.erl | 2 +- 7 files changed, 42 insertions(+), 29 deletions(-) diff --git a/apps/emqx/src/emqx_metrics_worker.erl b/apps/emqx/src/emqx_metrics_worker.erl index 21e73ff51..ab6a0b1a6 100644 --- a/apps/emqx/src/emqx_metrics_worker.erl +++ b/apps/emqx/src/emqx_metrics_worker.erl @@ -173,7 +173,7 @@ get_metrics(Name, Id) -> inc(Name, Id, Metric) -> inc(Name, Id, Metric, 1). --spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok. +-spec inc(handler_name(), metric_id(), atom(), integer()) -> ok. inc(Name, Id, Metric, Val) -> counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val). diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 0d53b813e..d7953ac3b 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -143,7 +143,7 @@ emqx_resource_schema { } } - queue_max_bytes { + max_queue_bytes { desc { en: """Maximum queue storage.""" zh: """消息队列的最大长度。""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 4d1c45eb4..2409a7069 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -68,7 +68,7 @@ batch_size => pos_integer(), batch_time => pos_integer(), enable_queue => boolean(), - queue_max_bytes => pos_integer(), + max_queue_bytes => pos_integer(), query_mode => query_mode(), resume_interval => pos_integer(), async_inflight_window => pos_integer() @@ -81,8 +81,11 @@ -define(WORKER_POOL_SIZE, 16). --define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). --define(DEFAULT_QUEUE_SIZE_RAW, <<"1GB">>). +-define(DEFAULT_QUEUE_SEG_SIZE, 10 * 1024 * 1024). +-define(DEFAULT_QUEUE_SEG_SIZE_RAW, <<"10MB">>). + +-define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024 * 1024). +-define(DEFAULT_QUEUE_SIZE_RAW, <<"100GB">>). %% count -define(DEFAULT_BATCH_SIZE, 100). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 261863d4c..eaf0a9a5f 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -577,8 +577,12 @@ maybe_alarm(_Status, ResId) -> ). maybe_resume_resource_workers(connected) -> - {_, Pid, _, _} = supervisor:which_children(emqx_resource_worker_sup), - emqx_resource_worker:resume(Pid); + lists:foreach( + fun({_, Pid, _, _}) -> + emqx_resource_worker:resume(Pid) + end, + supervisor:which_children(emqx_resource_worker_sup) + ); maybe_resume_resource_workers(_) -> ok. diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index dabaf037c..013f430b1 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -123,13 +123,15 @@ init({Id, Index, Opts}) -> true -> replayq:open(#{ dir => disk_queue_dir(Id, Index), - seg_bytes => maps:get(queue_max_bytes, Opts, ?DEFAULT_QUEUE_SIZE), + seg_bytes => maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE), + max_total_bytes => maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), sizer => fun ?MODULE:estimate_size/1, marshaller => fun ?MODULE:queue_item_marshaller/1 }); false -> undefined end, + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', replayq:count(Queue)), ok = inflight_new(Name), St = #{ id => Id, @@ -323,23 +325,27 @@ flush( end. maybe_append_queue(Id, undefined, _Items) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'), undefined; maybe_append_queue(Id, Q, Items) -> - case replayq:overflow(Q) of - Overflow when Overflow =< 0 -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), - replayq:append(Q, Items); - Overflow -> - PopOpts = #{bytes_limit => Overflow, count_limit => 999999999}, - {Q1, QAckRef, Items} = replayq:pop(Q, PopOpts), - ok = replayq:ack(Q1, QAckRef), - Dropped = length(Items), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), - ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), - Q1 - end. + Q2 = + case replayq:overflow(Q) of + Overflow when Overflow =< 0 -> + Q; + Overflow -> + PopOpts = #{bytes_limit => Overflow, count_limit => 999999999}, + {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), + ok = replayq:ack(Q1, QAckRef), + Dropped = length(Items2), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), + ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), + Q1 + end, + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), + replayq:append(Q2, Items). batch_reply_caller(Id, BatchResult, Batch) -> lists:foldl( diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index fe8564a41..9e54c8a7b 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -53,7 +53,7 @@ fields("creation_opts") -> {batch_size, fun batch_size/1}, {batch_time, fun batch_time/1}, {enable_queue, fun enable_queue/1}, - {max_queue_bytes, fun queue_max_bytes/1} + {max_queue_bytes, fun max_queue_bytes/1} ]. worker_pool_size(type) -> pos_integer(); @@ -110,11 +110,11 @@ batch_time(default) -> ?DEFAULT_BATCH_TIME_RAW; batch_time(required) -> false; batch_time(_) -> undefined. -queue_max_bytes(type) -> emqx_schema:bytesize(); -queue_max_bytes(desc) -> ?DESC("queue_max_bytes"); -queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW; -queue_max_bytes(required) -> false; -queue_max_bytes(_) -> undefined. +max_queue_bytes(type) -> emqx_schema:bytesize(); +max_queue_bytes(desc) -> ?DESC("max_queue_bytes"); +max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW; +max_queue_bytes(required) -> false; +max_queue_bytes(_) -> undefined. desc("creation_opts") -> ?DESC("creation_opts"). diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index aafce137c..847c4ff00 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -522,6 +522,6 @@ inc_action_metrics(R, RuleId) -> is_ok_result(ok) -> true; is_ok_result(R) when is_tuple(R) -> - ok = erlang:element(1, R); + ok == erlang:element(1, R); is_ok_result(ok) -> false. From 73e19d84ee2c169f6783a8bcd2de9450234adbb3 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 30 Aug 2022 23:47:58 +0800 Subject: [PATCH 10/13] feat: use the new metrics to bridge APIs --- apps/emqx_bridge/i18n/emqx_bridge_schema.conf | 152 ++++++++++++++++-- apps/emqx_bridge/include/emqx_bridge.hrl | 89 ++++++++++ apps/emqx_bridge/src/emqx_bridge_api.erl | 99 +----------- .../src/schema/emqx_bridge_schema.erl | 21 ++- .../test/emqx_bridge_mqtt_SUITE.erl | 64 ++++++-- .../src/emqx_connector_mqtt.erl | 3 +- apps/emqx_resource/src/emqx_resource.erl | 17 +- .../src/emqx_resource_manager.erl | 4 +- .../src/emqx_resource_worker.erl | 8 +- .../test/emqx_resource_SUITE.erl | 8 +- .../src/emqx_rule_sqltester.erl | 18 ++- 11 files changed, 327 insertions(+), 156 deletions(-) create mode 100644 apps/emqx_bridge/include/emqx_bridge.hrl diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index 704fd7bd7..06cc41a91 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -78,36 +78,149 @@ emqx_bridge_schema { } } + metric_batched { + desc { + en: """Count of messages that are currently accumulated in memory waiting for sending in one batch.""" + zh: """当前积压在内存里,等待批量发送的消息个数""" + } + label: { + en: "Batched" + zh: "等待批量发送" + } + } + + metric_dropped { + desc { + en: """Count of messages dropped.""" + zh: """被丢弃的消息个数。""" + } + label: { + en: "Dropped" + zh: "丢弃" + } + } + + metric_dropped_other { + desc { + en: """Count of messages dropped due to other reasons.""" + zh: """因为其他原因被丢弃的消息个数。""" + } + label: { + en: "Dropped Other" + zh: "其他丢弃" + } + } + metric_dropped_queue_full { + desc { + en: """Count of messages dropped due to the queue is full.""" + zh: """因为队列已满被丢弃的消息个数。""" + } + label: { + en: "Dropped Queue Full" + zh: "队列已满被丢弃" + } + } + metric_dropped_queue_not_enabled { + desc { + en: """Count of messages dropped due to the queue is not enabled.""" + zh: """因为队列未启用被丢弃的消息个数。""" + } + label: { + en: "Dropped Queue Disabled" + zh: "队列未启用被丢弃" + } + } + metric_dropped_resource_not_found { + desc { + en: """Count of messages dropped due to the resource is not found.""" + zh: """因为资源不存在被丢弃的消息个数。""" + } + label: { + en: "Dropped Resource NotFound" + zh: "资源不存在被丢弃" + } + } + metric_dropped_resource_stopped { + desc { + en: """Count of messages dropped due to the resource is stopped.""" + zh: """因为资源已停用被丢弃的消息个数。""" + } + label: { + en: "Dropped Resource Stopped" + zh: "资源停用被丢弃" + } + } metric_matched { desc { - en: """Count of this bridge is queried""" - zh: """Bridge 执行操作的次数""" + en: """Count of this bridge is matched and queried.""" + zh: """Bridge 被匹配到(被请求)的次数。""" } label: { - en: "Bridge Matched" - zh: "Bridge 执行操作的次数" + en: "Matched" + zh: "匹配次数" } } - metric_success { + metric_queued { desc { - en: """Count of query success""" - zh: """Bridge 执行操作成功的次数""" + en: """Count of messages that are currently queued.""" + zh: """当前被缓存到磁盘队列的消息个数。""" } label: { - en: "Bridge Success" - zh: "Bridge 执行操作成功的次数" + en: "Queued" + zh: "被缓存" + } + } + metric_sent { + desc { + en: """Count of messages that are sent by this bridge.""" + zh: """已经发送出去的消息个数。""" + } + label: { + en: "Sent" + zh: "已发送" + } + } + metric_sent_exception { + desc { + en: """Count of messages that were sent but exceptions occur.""" + zh: """发送出现异常的消息个数。""" + } + label: { + en: "Sent Exception" + zh: "发送异常" } } - metric_failed { + metric_sent_failed { desc { - en: """Count of query failed""" - zh: """Bridge 执行操作失败的次数""" + en: """Count of messages that sent failed.""" + zh: """发送失败的消息个数。""" } label: { - en: "Bridge Failed" - zh: "Bridge 执行操作失败的次数" + en: "Sent Failed" + zh: "发送失败" + } + } + + metric_sent_inflight { + desc { + en: """Count of messages that were sent asynchronously but ACKs are not received.""" + zh: """已异步地发送但没有收到 ACK 的消息个数。""" + } + label: { + en: "Sent Inflight" + zh: "已发送未确认" + } + } + metric_sent_success { + desc { + en: """Count of messages that sent successfully.""" + zh: """已经发送成功的消息个数。""" + } + label: { + en: "Sent Success" + zh: "发送成功" } } @@ -144,6 +257,17 @@ emqx_bridge_schema { } } + metric_received { + desc { + en: """Count of messages that is received from the remote system.""" + zh: """从远程系统收到的消息个数。""" + } + label: { + en: "Received" + zh: "已接收" + } + } + desc_bridges { desc { en: """Configuration for MQTT bridges.""" diff --git a/apps/emqx_bridge/include/emqx_bridge.hrl b/apps/emqx_bridge/include/emqx_bridge.hrl new file mode 100644 index 000000000..217c403b9 --- /dev/null +++ b/apps/emqx_bridge/include/emqx_bridge.hrl @@ -0,0 +1,89 @@ +-define(EMPTY_METRICS, + ?METRICS( + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 + ) +). + +-define(METRICS( + Batched, + Dropped, + DroppedOther, + DroppedQueueFull, + DroppedQueueNotEnabled, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Sent, + SentExcpt, + SentFailed, + SentInflight, + SentSucc, + RATE, + RATE_5, + RATE_MAX, + Rcvd +), + #{ + 'batched' => Batched, + 'dropped' => Dropped, + 'dropped.other' => DroppedOther, + 'dropped.queue_full' => DroppedQueueFull, + 'dropped.queue_not_enabled' => DroppedQueueNotEnabled, + 'dropped.resource_not_found' => DroppedResourceNotFound, + 'dropped.resource_stopped' => DroppedResourceStopped, + 'matched' => Matched, + 'queued' => Queued, + 'sent' => Sent, + 'sent.exception' => SentExcpt, + 'sent.failed' => SentFailed, + 'sent.inflight' => SentInflight, + 'sent.success' => SentSucc, + rate => RATE, + rate_last5m => RATE_5, + rate_max => RATE_MAX, + received => Rcvd + } +). + +-define(metrics( + Batched, + Dropped, + DroppedOther, + DroppedQueueFull, + DroppedQueueNotEnabled, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Sent, + SentExcpt, + SentFailed, + SentInflight, + SentSucc, + RATE, + RATE_5, + RATE_MAX, + Rcvd +), + #{ + 'batched' := Batched, + 'dropped' := Dropped, + 'dropped.other' := DroppedOther, + 'dropped.queue_full' := DroppedQueueFull, + 'dropped.queue_not_enabled' := DroppedQueueNotEnabled, + 'dropped.resource_not_found' := DroppedResourceNotFound, + 'dropped.resource_stopped' := DroppedResourceStopped, + 'matched' := Matched, + 'queued' := Queued, + 'sent' := Sent, + 'sent.exception' := SentExcpt, + 'sent.failed' := SentFailed, + 'sent.inflight' := SentInflight, + 'sent.success' := SentSucc, + rate := RATE, + rate_last5m := RATE_5, + rate_max := RATE_MAX, + received := Rcvd + } +). diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 2f15bbcd7..ddffcb79f 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -20,6 +20,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). -import(hoconsc, [mk/2, array/1, enum/1]). @@ -57,96 +58,6 @@ end ). --define(EMPTY_METRICS, - ?METRICS( - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 - ) -). - --define(METRICS( - Batched, - Dropped, - DroppedOther, - DroppedQueueFull, - DroppedQueueNotEnabled, - DroppedResourceNotFound, - DroppedResourceStopped, - Matched, - Queued, - Retried, - Sent, - SentExcpt, - SentFailed, - SentInflight, - SentSucc, - RATE, - RATE_5, - RATE_MAX -), - #{ - 'batched' => Batched, - 'dropped' => Dropped, - 'dropped.other' => DroppedOther, - 'dropped.queue_full' => DroppedQueueFull, - 'dropped.queue_not_enabled' => DroppedQueueNotEnabled, - 'dropped.resource_not_found' => DroppedResourceNotFound, - 'dropped.resource_stopped' => DroppedResourceStopped, - 'matched' => Matched, - 'queued' => Queued, - 'retried' => Retried, - 'sent' => Sent, - 'sent.exception' => SentExcpt, - 'sent.failed' => SentFailed, - 'sent.inflight' => SentInflight, - 'sent.success' => SentSucc, - rate => RATE, - rate_last5m => RATE_5, - rate_max => RATE_MAX - } -). - --define(metrics( - Batched, - Dropped, - DroppedOther, - DroppedQueueFull, - DroppedQueueNotEnabled, - DroppedResourceNotFound, - DroppedResourceStopped, - Matched, - Queued, - Retried, - Sent, - SentExcpt, - SentFailed, - SentInflight, - SentSucc, - RATE, - RATE_5, - RATE_MAX -), - #{ - 'batched' := Batched, - 'dropped' := Dropped, - 'dropped.other' := DroppedOther, - 'dropped.queue_full' := DroppedQueueFull, - 'dropped.queue_not_enabled' := DroppedQueueNotEnabled, - 'dropped.resource_not_found' := DroppedResourceNotFound, - 'dropped.resource_stopped' := DroppedResourceStopped, - 'matched' := Matched, - 'queued' := Queued, - 'retried' := Retried, - 'sent' := Sent, - 'sent.exception' := SentExcpt, - 'sent.failed' := SentFailed, - 'sent.inflight' := SentInflight, - 'sent.success' := SentSucc, - rate := RATE, - rate_last5m := RATE_5, - rate_max := RATE_MAX - } -). - namespace() -> "bridge". api_spec() -> @@ -770,12 +681,12 @@ format_metrics(#{ 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, 'queued' := Queued, - 'retried' := Retried, 'sent' := Sent, 'sent.exception' := SentExcpt, 'sent.failed' := SentFailed, 'sent.inflight' := SentInflight, - 'sent.success' := SentSucc + 'sent.success' := SentSucc, + 'received' := Rcvd }, rate := #{ matched := #{current := Rate, last5m := Rate5m, max := RateMax} @@ -791,7 +702,6 @@ format_metrics(#{ DroppedResourceStopped, Matched, Queued, - Retried, Sent, SentExcpt, SentFailed, @@ -799,7 +709,8 @@ format_metrics(#{ SentSucc, Rate, Rate5m, - RateMax + RateMax, + Rcvd ). fill_defaults(Type, RawConf) -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index aedfcaa03..f55ac840e 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -102,16 +102,31 @@ fields(bridges) -> ] ++ ee_fields_bridges(); fields("metrics") -> [ + {"batched", mk(integer(), #{desc => ?DESC("metric_batched")})}, + {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})}, + {"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})}, + {"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})}, + {"dropped.queue_not_enabled", + mk(integer(), #{desc => ?DESC("metric_dropped_queue_not_enabled")})}, + {"dropped.resource_not_found", + mk(integer(), #{desc => ?DESC("metric_dropped_resource_not_found")})}, + {"dropped.resource_stopped", + mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})}, {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})}, - {"success", mk(integer(), #{desc => ?DESC("metric_success")})}, - {"failed", mk(integer(), #{desc => ?DESC("metric_failed")})}, + {"queued", mk(integer(), #{desc => ?DESC("metric_queued")})}, + {"sent", mk(integer(), #{desc => ?DESC("metric_sent")})}, + {"sent.exception", mk(integer(), #{desc => ?DESC("metric_sent_exception")})}, + {"sent.failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})}, + {"sent.inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})}, + {"sent.success", mk(integer(), #{desc => ?DESC("metric_sent_success")})}, {"rate", mk(float(), #{desc => ?DESC("metric_rate")})}, {"rate_max", mk(float(), #{desc => ?DESC("metric_rate_max")})}, {"rate_last5m", mk( float(), #{desc => ?DESC("metric_rate_last5m")} - )} + )}, + {"received", mk(float(), #{desc => ?DESC("metric_received")})} ]; fields("node_metrics") -> [ diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 27c101728..02b76d64b 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -66,15 +66,6 @@ } }). --define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX), #{ - <<"matched">> := MATCH, - <<"success">> := SUCC, - <<"failed">> := FAILED, - <<"rate">> := SPEED, - <<"rate_last5m">> := SPEED5M, - <<"rate_max">> := SPEEDMAX -}). - inspect(Selected, _Envs, _Args) -> persistent_term:put(?MODULE, #{inspect => Selected}). @@ -185,6 +176,23 @@ t_mqtt_conn_bridge_ingress(_) -> end ), + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []), + ?assertMatch( + #{ + <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{<<"matched">> := 0, <<"received">> := 1} + } + ] + }, + jsx:decode(BridgeStr) + ), + %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -237,9 +245,15 @@ t_mqtt_conn_bridge_egress(_) -> {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), ?assertMatch( #{ - <<"metrics">> := ?metrics(1, 1, 0, _, _, _), + <<"metrics">> := #{<<"matched">> := 1, <<"sent.success">> := 1, <<"sent.failed">> := 0}, <<"node_metrics">> := - [#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}] + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{<<"matched">> := 1, <<"sent.success">> := 1, <<"sent.failed">> := 0} + } + ] }, jsx:decode(BridgeStr) ), @@ -337,6 +351,23 @@ t_ingress_mqtt_bridge_with_rules(_) -> persistent_term:get(?MODULE) ), + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []), + ?assertMatch( + #{ + <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{<<"matched">> := 0, <<"received">> := 1} + } + ] + }, + jsx:decode(BridgeStr) + ), + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []). @@ -433,9 +464,16 @@ t_egress_mqtt_bridge_with_rules(_) -> {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), ?assertMatch( #{ - <<"metrics">> := ?metrics(2, 2, 0, _, _, _), + <<"metrics">> := #{<<"matched">> := 2, <<"sent.success">> := 2, <<"sent.failed">> := 0}, <<"node_metrics">> := - [#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}] + [ + #{ + <<"node">> := _, + <<"metrics">> := #{ + <<"matched">> := 2, <<"sent.success">> := 2, <<"sent.failed">> := 0 + } + } + ] }, jsx:decode(BridgeStr) ), diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index bdf43885a..3ce6925bc 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -135,8 +135,7 @@ drop_bridge(Name) -> %% When use this bridge as a data source, ?MODULE:on_message_received will be called %% if the bridge received msgs from the remote broker. on_message_received(Msg, HookPoint, ResId) -> - emqx_resource:inc_matched(ResId), - emqx_resource:inc_success(ResId), + emqx_resource:inc_received(ResId), emqx:run_hook(HookPoint, [Msg]). %% =================================================================== diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b07460643..c1d500e8b 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -110,7 +110,7 @@ list_group_instances/1 ]). --export([inc_metrics_funcs/1, inc_matched/1, inc_success/1, inc_failed/1]). +-export([inc_received/1]). -optional_callbacks([ on_query/3, @@ -443,19 +443,8 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> %% ================================================================================= -inc_matched(ResId) -> - emqx_metrics_worker:inc(?RES_METRICS, ResId, matched). - -inc_success(ResId) -> - emqx_metrics_worker:inc(?RES_METRICS, ResId, success). - -inc_failed(ResId) -> - emqx_metrics_worker:inc(?RES_METRICS, ResId, failed). +inc_received(ResId) -> + emqx_metrics_worker:inc(?RES_METRICS, ResId, 'received'). filter_instances(Filter) -> [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)]. - -inc_metrics_funcs(ResId) -> - OnSucc = [{fun ?MODULE:inc_success/1, ResId}], - OnFailed = [{fun ?MODULE:inc_failed/1, ResId}], - {OnSucc, OnFailed}. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index eaf0a9a5f..2581a3001 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -134,7 +134,6 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> 'dropped', 'queued', 'batched', - 'retried', 'sent.success', 'sent.failed', 'sent.exception', @@ -143,7 +142,8 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> 'dropped.queue_full', 'dropped.resource_not_found', 'dropped.resource_stopped', - 'dropped.other' + 'dropped.other', + 'received' ], [matched] ), diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 013f430b1..3a12a6b91 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -131,7 +131,7 @@ init({Id, Index, Opts}) -> false -> undefined end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', replayq:count(Queue)), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', queue_count(Queue)), ok = inflight_new(Name), St = #{ id => Id, @@ -254,7 +254,6 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S case handle_query_result(Id, Result, false) of %% Send failed because resource down true -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), {keep_state, St0, {state_timeout, ResumeT, resume}}; %% Send ok or failed but the resource is working false -> @@ -569,6 +568,11 @@ assert_ok_result(R) when is_tuple(R) -> assert_ok_result(R) -> error({not_ok_result, R}). +queue_count(undefined) -> + 0; +queue_count(Q) -> + replayq:count(Q). + -spec name(id(), integer()) -> atom(). name(Id, Index) -> Mod = atom_to_list(?MODULE), diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 0bfa67d07..68b4fb6dd 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -268,7 +268,7 @@ t_query_counter_async_query(_) -> end ), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), - ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C), + ?assertMatch(#{matched := 1002, 'sent.success' := 1002, 'sent.failed' := 0}, C), ok = emqx_resource:remove_local(?ID). t_query_counter_async_callback(_) -> @@ -309,7 +309,7 @@ t_query_counter_async_callback(_) -> end ), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), - ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C), + ?assertMatch(#{matched := 1002, sent := 1002, 'sent.success' := 1002, 'sent.failed' := 0}, C), ?assertMatch(1000, ets:info(Tab0, size)), ?assert( lists:all( @@ -419,8 +419,8 @@ t_query_counter_async_inflight(_) -> {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), ct:pal("metrics: ~p", [C]), ?assertMatch( - #{matched := M, success := S, exception := E, failed := F, recoverable_error := RD} when - M >= Sent andalso M == S + E + F + RD, + #{matched := M, sent := St, 'sent.success' := Ss, dropped := D} when + St == Ss andalso M == St + D, C ), ?assert( diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index c333bb80e..9669e0113 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -62,14 +62,16 @@ test_rule(Sql, Select, Context, EventTopics) -> }, FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)), try emqx_rule_runtime:apply_rule(Rule, FullContext, #{}) of - {ok, Data} -> {ok, flatten(Data)}; - {error, Reason} -> {error, Reason} + {ok, Data} -> + {ok, flatten(Data)}; + {error, Reason} -> + {error, Reason} after ok = emqx_rule_engine:clear_metrics_for_rule(RuleId) end. get_selected_data(Selected, _Envs, _Args) -> - Selected. + {ok, Selected}. is_publish_topic(<<"$events/", _/binary>>) -> false; is_publish_topic(<<"$bridges/", _/binary>>) -> false; @@ -77,14 +79,14 @@ is_publish_topic(_Topic) -> true. flatten([]) -> []; -flatten([D1]) -> - D1; -flatten([D1 | L]) when is_list(D1) -> - D1 ++ flatten(L). +flatten([{ok, D}]) -> + D; +flatten([D | L]) when is_list(D) -> + [D0 || {ok, D0} <- D] ++ flatten(L). echo_action(Data, Envs) -> ?TRACE("RULE", "testing_rule_sql_ok", #{data => Data, envs => Envs}), - Data. + {ok, Data}. fill_default_values(Event, Context) -> maps:merge(envs_examp(Event), Context). From ca52b8eb297b0f128c3b6305d4ff79b1350eee76 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 31 Aug 2022 09:18:10 +0800 Subject: [PATCH 11/13] fix: start connector-mqtt failed when username/password not provided --- .../src/emqx_connector_mqtt.erl | 43 ++++++++++++------- apps/emqx_rule_engine/include/rule_engine.hrl | 14 +++--- .../src/emqx_rule_runtime.erl | 12 +++--- 3 files changed, 40 insertions(+), 29 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 3ce6925bc..915fd8007 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -224,20 +224,20 @@ make_forward_confs(undefined) -> make_forward_confs(FrowardConf) -> FrowardConf. -basic_config(#{ - server := Server, - reconnect_interval := ReconnIntv, - proto_ver := ProtoVer, - bridge_mode := BridgeMode, - username := User, - password := Password, - clean_start := CleanStart, - keepalive := KeepAlive, - retry_interval := RetryIntv, - max_inflight := MaxInflight, - ssl := #{enable := EnableSsl} = Ssl -}) -> +basic_config( #{ + server := Server, + reconnect_interval := ReconnIntv, + proto_ver := ProtoVer, + bridge_mode := BridgeMode, + clean_start := CleanStart, + keepalive := KeepAlive, + retry_interval := RetryIntv, + max_inflight := MaxInflight, + ssl := #{enable := EnableSsl} = Ssl + } = Conf +) -> + BaiscConf = #{ %% connection opts server => Server, %% 30s @@ -251,8 +251,6 @@ basic_config(#{ %% non-standard mqtt connection packets will be filtered out by LB. %% So let's disable bridge_mode. bridge_mode => BridgeMode, - username => User, - password => Password, clean_start => CleanStart, keepalive => ms_to_s(KeepAlive), retry_interval => RetryIntv, @@ -260,7 +258,20 @@ basic_config(#{ ssl => EnableSsl, ssl_opts => maps:to_list(maps:remove(enable, Ssl)), if_record_metrics => true - }. + }, + maybe_put_fields([username, password], Conf, BaiscConf). + +maybe_put_fields(Fields, Conf, Acc0) -> + lists:foldl( + fun(Key, Acc) -> + case maps:find(Key, Conf) of + error -> Acc; + {ok, Val} -> Acc#{Key => Val} + end + end, + Acc0, + Fields + ). ms_to_s(Ms) -> erlang:ceil(Ms / 1000). diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 77d371711..d15db24be 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -88,18 +88,18 @@ %% Logical operators -define(is_logical(Op), (Op =:= 'and' orelse Op =:= 'or')). --define(RAISE(_EXP_, _ERROR_), - ?RAISE(_EXP_, _ = do_nothing, _ERROR_) +-define(RAISE(EXP, ERROR), + ?RAISE(EXP, _ = do_nothing, ERROR) ). --define(RAISE(_EXP_, _EXP_ON_FAIL_, _ERROR_), +-define(RAISE(EXP, EXP_ON_FAIL, ERROR), fun() -> try - (_EXP_) + (EXP) catch - _EXCLASS_:_EXCPTION_:_ST_ -> - _EXP_ON_FAIL_, - throw(_ERROR_) + EXCLASS:EXCPTION:ST -> + EXP_ON_FAIL, + throw(ERROR) end end() ). diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 847c4ff00..9ec69f1d7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -130,13 +130,13 @@ do_apply_rule( ) -> {Selected, Collection} = ?RAISE( select_and_collect(Fields, Columns), - {select_and_collect_error, {_EXCLASS_, _EXCPTION_, _ST_}} + {select_and_collect_error, {EXCLASS, EXCPTION, ST}} ), ColumnsAndSelected = maps:merge(Columns, Selected), case ?RAISE( match_conditions(Conditions, ColumnsAndSelected), - {match_conditions_error, {_EXCLASS_, _EXCPTION_, _ST_}} + {match_conditions_error, {EXCLASS, EXCPTION, ST}} ) of true -> @@ -166,12 +166,12 @@ do_apply_rule( ) -> Selected = ?RAISE( select_and_transform(Fields, Columns), - {select_and_transform_error, {_EXCLASS_, _EXCPTION_, _ST_}} + {select_and_transform_error, {EXCLASS, EXCPTION, ST}} ), case ?RAISE( match_conditions(Conditions, maps:merge(Columns, Selected)), - {match_conditions_error, {_EXCLASS_, _EXCPTION_, _ST_}} + {match_conditions_error, {EXCLASS, EXCPTION, ST}} ) of true -> @@ -245,7 +245,7 @@ filter_collection(Columns, InCase, DoEach, {CollKey, CollVal}) -> case ?RAISE( match_conditions(InCase, ColumnsAndItem), - {match_incase_error, {_EXCLASS_, _EXCPTION_, _ST_}} + {match_incase_error, {EXCLASS, EXCPTION, ST}} ) of true when DoEach == [] -> {true, ColumnsAndItem}; @@ -253,7 +253,7 @@ filter_collection(Columns, InCase, DoEach, {CollKey, CollVal}) -> {true, ?RAISE( select_and_transform(DoEach, ColumnsAndItem), - {doeach_error, {_EXCLASS_, _EXCPTION_, _ST_}} + {doeach_error, {EXCLASS, EXCPTION, ST}} )}; false -> false From 14633eaac876a055a0f651a30c962ee784d8d873 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 31 Aug 2022 09:40:22 +0800 Subject: [PATCH 12/13] fix: please the elvis --- apps/emqx_rule_engine/src/emqx_rule_runtime.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 9ec69f1d7..b1d563291 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -42,6 +42,10 @@ -type alias() :: atom(). -type collection() :: {alias(), [term()]}. +-elvis([ + {elvis_style, invalid_dynamic_call, #{ignore => [emqx_rule_runtime]}} +]). + -define(ephemeral_alias(TYPE, NAME), iolist_to_binary(io_lib:format("_v_~ts_~p_~p", [TYPE, NAME, erlang:system_time()])) ). @@ -271,7 +275,7 @@ match_conditions({'not', Var}, Data) -> case eval(Var, Data) of Bool when is_boolean(Bool) -> not Bool; - _other -> + _Other -> false end; match_conditions({in, Var, {list, Vals}}, Data) -> From ba1f5eecd32028c426b055bc4a211d0f2f5f381b Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 31 Aug 2022 11:14:36 +0800 Subject: [PATCH 13/13] fix: update the swagger for new resource metrics --- apps/emqx_bridge/include/emqx_bridge.hrl | 10 ++++++++++ .../emqx_ee_bridge/include/emqx_ee_bridge.hrl | 18 ------------------ .../src/emqx_ee_bridge_hstreamdb.erl | 2 +- .../src/emqx_ee_bridge_influxdb.erl | 2 +- .../src/emqx_ee_bridge_mysql.erl | 2 +- 5 files changed, 13 insertions(+), 21 deletions(-) diff --git a/apps/emqx_bridge/include/emqx_bridge.hrl b/apps/emqx_bridge/include/emqx_bridge.hrl index 217c403b9..bb8ee6e29 100644 --- a/apps/emqx_bridge/include/emqx_bridge.hrl +++ b/apps/emqx_bridge/include/emqx_bridge.hrl @@ -87,3 +87,13 @@ received := Rcvd } ). + +-define(METRICS_EXAMPLE, #{ + metrics => ?EMPTY_METRICS, + node_metrics => [ + #{ + node => node(), + metrics => ?EMPTY_METRICS + } + ] +}). diff --git a/lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl b/lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl index 0065db56b..e69de29bb 100644 --- a/lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl +++ b/lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl @@ -1,18 +0,0 @@ --define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{ - matched => MATCH, - success => SUCC, - failed => FAILED, - rate => RATE, - rate_last5m => RATE_5, - rate_max => RATE_MAX -}). - --define(METRICS_EXAMPLE, #{ - metrics => ?METRICS(0, 0, 0, 0, 0, 0), - node_metrics => [ - #{ - node => node(), - metrics => ?METRICS(0, 0, 0, 0, 0, 0) - } - ] -}). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl index 3b5183150..dfae764c8 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl @@ -5,7 +5,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --include("emqx_ee_bridge.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 24b4aebb1..6ad804b2c 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -3,7 +3,7 @@ %%-------------------------------------------------------------------- -module(emqx_ee_bridge_influxdb). --include("emqx_ee_bridge.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). -include_lib("emqx_connector/include/emqx_connector.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index 6a5d4a3a2..f76eb388e 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -5,7 +5,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --include("emqx_ee_bridge.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]).