From c1212c4b6db89f208cba9ba0233a324049650ed9 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 4 Jan 2022 19:38:02 +0800 Subject: [PATCH 1/7] refactor(metrics): improve the metrics for bridges/rules --- .../src/emqx_plugin_libs_metrics.erl | 177 +++++++++--------- .../test/emqx_plugin_libs_metrics_SUITE.erl | 87 ++++++--- 2 files changed, 152 insertions(+), 112 deletions(-) diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl index 2a46b470b..38e922bc2 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl @@ -28,17 +28,15 @@ , inc/4 , get/3 , get_rate/2 - , create_metrics/2 + , get_all_counters/2 + , create_metrics/3 + , create_metrics/4 , clear_metrics/2 ]). -export([ get_metrics/2 , get_matched/2 - , get_success/2 - , get_failed/2 , inc_matched/2 - , inc_success/2 - , inc_failed/2 ]). %% gen_server callbacks @@ -61,13 +59,14 @@ -export_type([metrics/0]). +-type rate() :: #{ + current => float(), + max => float(), + last5m => float() +}. -type metrics() :: #{ - matched => integer(), - success => integer(), - failed => integer(), - rate => float(), - rate_max => float(), - rate_last5m => float() + counters => #{atom() => integer()}, + rate => #{atom() => rate()} }. -type handler_name() :: atom(). -type metric_id() :: binary(). @@ -107,35 +106,41 @@ child_spec(Name) -> , modules => [emqx_plugin_libs_metrics] }. --spec(create_metrics(handler_name(), metric_id()) -> ok). -create_metrics(Name, Id) -> - gen_server:call(Name, {create_metrics, Id}). +-spec(create_metrics(handler_name(), metric_id(), [atom()]) -> ok). +create_metrics(Name, Id, Metrics) -> + create_metrics(Name, Id, Metrics, Metrics). + +-spec(create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok). +create_metrics(Name, Id, Metrics, RateMetrics) -> + gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}). -spec(clear_metrics(handler_name(), metric_id()) -> ok). clear_metrics(Name, Id) -> gen_server:call(Name, {delete_metrics, Id}). --spec(get(handler_name(), metric_id(), atom()) -> number()). +-spec(get(handler_name(), metric_id(), atom() | integer()) -> number()). get(Name, Id, Metric) -> - case get_couters_ref(Name, Id) of + case get_ref(Name, Id) of not_found -> 0; - Ref -> counters:get(Ref, metrics_idx(Metric)) + Ref when is_atom(Metric) -> + counters:get(Ref, idx_metric(Name, Id, Metric)); + Ref when is_integer(Metric) -> + counters:get(Ref, Metric) end. -spec(get_rate(handler_name(), metric_id()) -> map()). get_rate(Name, Id) -> gen_server:call(Name, {get_rate, Id}). +-spec(get_all_counters(handler_name(), metric_id()) -> map()). +get_all_counters(Name, Id) -> + maps:map(fun(_Metric, Index) -> + get(Name, Id, Index) + end, get_indexes(Name, Id)). + -spec(get_metrics(handler_name(), metric_id()) -> metrics()). get_metrics(Name, Id) -> - #{max := Max, current := Current, last5m := Last5M} = get_rate(Name, Id), - #{matched => get_matched(Name, Id), - success => get_success(Name, Id), - failed => get_failed(Name, Id), - rate => Current, - rate_max => Max, - rate_last5m => Last5M - }. + #{rate => get_rate(Name, Id), counters => get_all_counters(Name, Id)}. -spec inc(handler_name(), metric_id(), atom()) -> ok. inc(Name, Id, Metric) -> @@ -143,34 +148,14 @@ inc(Name, Id, Metric) -> -spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok. inc(Name, Id, Metric, Val) -> - case get_couters_ref(Name, Id) of - not_found -> - %% this may occur when increasing a counter for - %% a rule that was created from a remove node. - create_metrics(Name, Id), - counters:add(get_couters_ref(Name, Id), metrics_idx(Metric), Val); - Ref -> - counters:add(Ref, metrics_idx(Metric), Val) - end. + counters:add(get_ref(Name, Id), idx_metric(Name, Id,Metric), Val). inc_matched(Name, Id) -> inc(Name, Id, 'matched', 1). -inc_success(Name, Id) -> - inc(Name, Id, 'success', 1). - -inc_failed(Name, Id) -> - inc(Name, Id, 'failed', 1). - get_matched(Name, Id) -> get(Name, Id, 'matched'). -get_success(Name, Id) -> - get(Name, Id, 'success'). - -get_failed(Name, Id) -> - get(Name, Id, 'failed'). - start_link(Name) -> gen_server:start_link({local, Name}, ?MODULE, Name, []). @@ -181,31 +166,31 @@ init(Name) -> persistent_term:put(?CntrRef(Name), #{}), {ok, #state{}}. -handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) -> - {reply, format_rate(#rate{}), State}; handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) -> {reply, case maps:get(Id, Rates, undefined) of - undefined -> format_rate(#rate{}); - Rate -> format_rate(Rate) + undefined -> #{}; + RatesPerId -> format_rates_of_id(RatesPerId) end, State}; -handle_call({create_metrics, Id}, _From, +handle_call({create_metrics, Id, Metrics, RateMetrics}, _From, State = #state{metric_ids = MIDs, rates = Rates}) -> - {reply, create_counters(get_self_name(), Id), + RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]), + Rate1= case Rates of + undefined -> #{Id => RatePerId}; + _ -> Rates#{Id => RatePerId} + end, + {reply, create_counters(get_self_name(), Id, Metrics), State#state{metric_ids = sets:add_element(Id, MIDs), - rates = case Rates of - undefined -> #{Id => #rate{}}; - _ -> Rates#{Id => #rate{}} - end}}; + rates = Rate1}}; handle_call({delete_metrics, Id}, _From, State = #state{metric_ids = MIDs, rates = Rates}) -> {reply, delete_counters(get_self_name(), Id), State#state{metric_ids = sets:del_element(Id, MIDs), - rates = case Rates of - undefined -> undefined; - _ -> maps:remove(Id, Rates) - end}}; + rates = case Rates of + undefined -> undefined; + _ -> maps:remove(Id, Rates) + end}}; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -218,10 +203,12 @@ handle_info(ticking, State = #state{rates = undefined}) -> {noreply, State}; handle_info(ticking, State = #state{rates = Rates0}) -> - Rates = maps:map( - fun(Id, Rate) -> - calculate_rate(get_matched(get_self_name(), Id), Rate) - end, Rates0), + Rates = + maps:map(fun(Id, RatesPerID) -> + maps:map(fun(Metric, Rate) -> + calculate_rate(get(get_self_name(), Id, Metric), Rate) + end, RatesPerID) + end, Rates0), erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State#state{rates = Rates}}; @@ -243,29 +230,45 @@ stop(Name) -> %% Internal Functions %%------------------------------------------------------------------------------ -create_counters(Name, Id) -> - case get_couters_ref(Name, Id) of - not_found -> - Counters = get_all_counters(Name), - CntrRef = counters:new(max_counters_size(), [write_concurrency]), - persistent_term:put(?CntrRef(Name), Counters#{Id => CntrRef}); - _Ref -> ok - end. +create_counters(_Name, _Id, []) -> + error({create_counter_error, must_provide_a_list_of_metrics}); +create_counters(Name, Id, Metrics) -> + Size = length(Metrics), + Indexes = maps:from_list(lists:zip(Metrics, lists:seq(1, Size))), + Counters = get_counters(Name), + CntrRef = counters:new(Size, [write_concurrency]), + persistent_term:put(?CntrRef(Name), + Counters#{Id => #{ref => CntrRef, indexes => Indexes}}). delete_counters(Name, Id) -> - persistent_term:put(?CntrRef(Name), maps:remove(Id, get_all_counters(Name))). + persistent_term:put(?CntrRef(Name), maps:remove(Id, get_counters(Name))). -get_couters_ref(Name, Id) -> - maps:get(Id, get_all_counters(Name), not_found). +get_ref(Name, Id) -> + case maps:find(Id, get_counters(Name)) of + {ok, #{ref := Ref}} -> Ref; + error -> not_found + end. -get_all_counters(Name) -> +idx_metric(Name, Id, Metric) -> + case get_indexes(Name, Id) of + not_found -> not_found; + Indexes -> maps:get(Metric, Indexes, not_found) + end. + +get_indexes(Name, Id) -> + case maps:find(Id, get_counters(Name)) of + {ok, #{indexes := Indexes}} -> Indexes; + error -> not_found + end. + +get_counters(Name) -> persistent_term:get(?CntrRef(Name), #{}). calculate_rate(_CurrVal, undefined) -> undefined; calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal, - tick = Tick, last5m_acc = AccRate5Min0, - last5m_smpl = Last5MinSamples0}) -> + tick = Tick, last5m_acc = AccRate5Min0, + last5m_smpl = Last5MinSamples0}) -> %% calculate the current rate based on the last value of the counter CurrRate = (CurrVal - LastVal) / ?SAMPLING, @@ -292,8 +295,13 @@ calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal, last_v = CurrVal, last5m_acc = Acc5Min, last5m_smpl = Last5MinSamples, tick = Tick + 1}. +format_rates_of_id(RatesPerId) -> + maps:map(fun(_Metric, Rates) -> + format_rate(Rates) + end, RatesPerId). + format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) -> - #{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}. + #{max => precision(Max, 2), current => precision(Current, 2), last5m => precision(Last5Min, 2)}. precision(Float, N) -> Base = math:pow(10, N), @@ -302,14 +310,3 @@ precision(Float, N) -> get_self_name() -> {registered_name, Name} = process_info(self(), registered_name), Name. - -%%------------------------------------------------------------------------------ -%% Metrics Definitions -%%------------------------------------------------------------------------------ - -max_counters_size() -> 32. -metrics_idx('matched') -> 1; -metrics_idx('success') -> 2; -metrics_idx('failed') -> 3; -metrics_idx(_) -> 32. - diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl index 77ad03bfe..1fd4d9ca4 100644 --- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl @@ -23,22 +23,11 @@ -include_lib("common_test/include/ct.hrl"). all() -> - [ {group, metrics} - , {group, rate} ]. + emqx_common_test_helpers:all(?MODULE). suite() -> [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. -groups() -> - [{metrics, [sequence], - [ t_rule - , t_no_creation_1 - ]}, - {rate, [sequence], - [ rule_rate - ]} - ]. - -define(NAME, ?MODULE). init_per_suite(Config) -> @@ -59,12 +48,66 @@ init_per_testcase(_, Config) -> end_per_testcase(_, _Config) -> ok. -t_no_creation_1(_) -> - ?assertEqual(ok, emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched')). +t_get_metrics(_) -> + Metrics = [a, b, c], + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics), + %% all the metrics are set to zero at start + ?assertMatch(#{ + rate := #{ + a := #{current := 0.0, max := 0.0, last5m := 0.0}, + b := #{current := 0.0, max := 0.0, last5m := 0.0}, + c := #{current := 0.0, max := 0.0, last5m := 0.0} + }, + counters := #{ + a := 0, + b := 0, + c := 0 + } + }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ct:sleep(1500), + ?LET(#{ + rate := #{ + a := #{current := CurrA, max := MaxA, last5m := _}, + b := #{current := CurrB, max := MaxB, last5m := _}, + c := #{current := CurrC, max := MaxC, last5m := _} + }, + counters := #{ + a := 1, + b := 1, + c := 2 + } + }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>), + {?assert(CurrA > 0), ?assert(CurrB > 0), ?assert(CurrC > 0), + ?assert(MaxA > 0), ?assert(MaxB > 0), ?assert(MaxC > 0)}), + ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). -t_rule(_) -> - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>), - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>), +t_get_metrics_2(_) -> + Metrics = [a, b, c], + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics, + [a]), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ?assertMatch(#{ + rate := Rate = #{ + a := #{current := _, max := _, last5m := _} + }, + counters := #{ + a := 1, + b := 1, + c := 1 + } + } when map_size(Rate) =:= 1, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), + ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). + +t_inc_matched(_) -> + Metrics = ['rules.matched'], + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, Metrics), + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>, Metrics), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'), @@ -74,20 +117,20 @@ t_rule(_) -> ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>), ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>). -rule_rate(_) -> - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>), - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>), +t_rate(_) -> + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, ['rules.matched']), + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>, ['rules.matched']), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule:2">>, 'rules.matched'), ?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')), ct:sleep(1000), - ?LET(#{max := Max, current := Current}, + ?LET(#{'rules.matched' := #{max := Max, current := Current}}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), {?assert(Max =< 2), ?assert(Current =< 2)}), ct:sleep(2100), - ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), + ?LET(#{'rules.matched' := #{max := Max, current := Current, last5m := Last5Min}}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), {?assert(Max =< 2), ?assert(Current == 0), ?assert(Last5Min =< 0.67)}), From ab5ad22b1dcdd915e391787242c052b6b71212c0 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 5 Jan 2022 10:17:55 +0800 Subject: [PATCH 2/7] fix(metrics): update the calls to emqx_plugin_libs_metrcis module --- apps/emqx_resource/src/emqx_resource_instance.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 2252429a7..38681158c 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -176,7 +176,7 @@ do_create(InstId, ResourceType, Config, Opts) -> {error, not_found} -> case do_start(InstId, ResourceType, Config, Opts) of ok -> - ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId), + ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId), {ok, force_lookup(InstId)}; Error -> Error From 78aa0abd3d14c870dfc221cef4ea3b5f4992d43e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 5 Jan 2022 10:17:55 +0800 Subject: [PATCH 3/7] fix(metrics): update the calls to emqx_plugin_libs_metrcis module --- .../src/emqx_plugin_libs_metrics.erl | 34 ++++++++----------- apps/emqx_resource/src/emqx_resource.erl | 6 ++-- .../src/emqx_resource_instance.erl | 3 +- 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl index 38e922bc2..c803e375a 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl @@ -35,8 +35,6 @@ ]). -export([ get_metrics/2 - , get_matched/2 - , inc_matched/2 ]). %% gen_server callbacks @@ -74,7 +72,6 @@ -define(CntrRef(Name), {?MODULE, Name}). -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)). -%% the rate of 'matched' -record(rate, { max = 0 :: number(), current = 0 :: number(), @@ -106,11 +103,11 @@ child_spec(Name) -> , modules => [emqx_plugin_libs_metrics] }. --spec(create_metrics(handler_name(), metric_id(), [atom()]) -> ok). +-spec(create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}). create_metrics(Name, Id, Metrics) -> create_metrics(Name, Id, Metrics, Metrics). --spec(create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok). +-spec(create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok | {error, term()}). create_metrics(Name, Id, Metrics, RateMetrics) -> gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}). @@ -150,12 +147,6 @@ inc(Name, Id, Metric) -> inc(Name, Id, Metric, Val) -> counters:add(get_ref(Name, Id), idx_metric(Name, Id,Metric), Val). -inc_matched(Name, Id) -> - inc(Name, Id, 'matched', 1). - -get_matched(Name, Id) -> - get(Name, Id, 'matched'). - start_link(Name) -> gen_server:start_link({local, Name}, ?MODULE, Name, []). @@ -174,14 +165,19 @@ handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) -> handle_call({create_metrics, Id, Metrics, RateMetrics}, _From, State = #state{metric_ids = MIDs, rates = Rates}) -> - RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]), - Rate1= case Rates of - undefined -> #{Id => RatePerId}; - _ -> Rates#{Id => RatePerId} - end, - {reply, create_counters(get_self_name(), Id, Metrics), - State#state{metric_ids = sets:add_element(Id, MIDs), - rates = Rate1}}; + case RateMetrics -- Metrics of + [] -> + RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]), + Rate1 = case Rates of + undefined -> #{Id => RatePerId}; + _ -> Rates#{Id => RatePerId} + end, + {reply, create_counters(get_self_name(), Id, Metrics), + State#state{metric_ids = sets:add_element(Id, MIDs), + rates = Rate1}}; + _ -> + {reply, {error, metrics_to}, State} + end. handle_call({delete_metrics, Id}, _From, State = #state{metric_ids = MIDs, rates = Rates}) -> diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 58c2d9956..704c1a4d3 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -346,9 +346,9 @@ filter_instances(Filter) -> [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)]. inc_metrics_funcs(InstId) -> - OnFailed = [{fun emqx_plugin_libs_metrics:inc_failed/2, [resource_metrics, InstId]}], - OnSucc = [ {fun emqx_plugin_libs_metrics:inc_matched/2, [resource_metrics, InstId]} - , {fun emqx_plugin_libs_metrics:inc_success/2, [resource_metrics, InstId]} + OnFailed = [{fun emqx_plugin_libs_metrics:inc/2, [resource_metrics, InstId, failed]}], + OnSucc = [ {fun emqx_plugin_libs_metrics:inc/2, [resource_metrics, InstId, matched]} + , {fun emqx_plugin_libs_metrics:inc/2, [resource_metrics, success]} ], {OnSucc, OnFailed}. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 38681158c..18a8c8580 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -176,7 +176,8 @@ do_create(InstId, ResourceType, Config, Opts) -> {error, not_found} -> case do_start(InstId, ResourceType, Config, Opts) of ok -> - ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId), + ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId, + [matched, success, failed], [matched]), {ok, force_lookup(InstId)}; Error -> Error From 67a60e1153dd9db3479f078b99acb12b9c5d95fc Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 5 Jan 2022 20:05:09 +0800 Subject: [PATCH 4/7] refactor(rule): add more metrics for rule and bridges --- apps/emqx_bridge/src/emqx_bridge_api.erl | 9 +- .../test/emqx_connector_api_SUITE.erl | 36 +++++++- .../src/emqx_plugin_libs_metrics.erl | 40 +++++---- .../test/emqx_plugin_libs_metrics_SUITE.erl | 29 +++++++ apps/emqx_resource/src/emqx_resource.erl | 12 ++- .../src/emqx_resource_instance.erl | 2 +- .../emqx_rule_engine/src/emqx_rule_engine.erl | 18 ++-- .../src/emqx_rule_engine_api.erl | 82 +++++++++++++++---- .../src/emqx_rule_runtime.erl | 44 +++++++--- .../src/emqx_rule_sqltester.erl | 4 +- 10 files changed, 212 insertions(+), 64 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index e080015e2..926bd3612 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -418,9 +418,16 @@ format_resp(#{id := Id, raw_config := RawConf, name => maps:get(<<"name">>, RawConf, BridgeName), node => node(), status => IsConnected(Status), - metrics => Metrics + metrics => format_metrics(Metrics) }. +format_metrics(#{ + counters := #{failed := Failed, exception := Ex, matched := Match, success := Succ}, + rate := #{ + matched := #{current := Rate, last5m := Rate5m, max := RateMax} + } }) -> + ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax). + rpc_multicall(Func, Args) -> Nodes = mria_mnesia:running_nodes(), ResL = erpc:multicall(Nodes, ?MODULE, Func, Args, 15000), diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 2819cbbc0..821d02278 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -523,7 +523,21 @@ t_ingress_mqtt_bridge_with_rules(_) -> %% and also the rule should be matched, with matched + 1: {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), #{ <<"id">> := RuleId - , <<"metrics">> := #{<<"matched">> := 1} + , <<"metrics">> := #{ + <<"matched">> := 1, + <<"passed">> := 1, + <<"failed">> := 0, + <<"failed.exception">> := 0, + <<"failed.no_result">> := 0, + <<"rate">> := _, + <<"rate_max">> := _, + <<"rate_last5m">> := _, + <<"outputs.total">> := 1, + <<"outputs.success">> := 1, + <<"outputs.failed">> := 0, + <<"outputs.failed.out_of_service">> := 0, + <<"outputs.failed.unknown">> := 0 + } } = jsx:decode(Rule1), %% we also check if the outputs of the rule is triggered ?assertMatch(#{inspect := #{ @@ -578,7 +592,7 @@ t_egress_mqtt_bridge_with_rules(_) -> ?assert( receive {deliver, RemoteTopic, #message{payload = Payload}} -> - ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), + ct:pal("remote broker got message: ~p on topic ~p", [Payload, RemoteTopic]), true; Msg -> ct:pal("Msg: ~p", [Msg]), @@ -598,13 +612,27 @@ t_egress_mqtt_bridge_with_rules(_) -> emqx:publish(emqx_message:make(RuleTopic, Payload2)), {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), #{ <<"id">> := RuleId - , <<"metrics">> := #{<<"matched">> := 1} + , <<"metrics">> := #{ + <<"matched">> := 1, + <<"passed">> := 1, + <<"failed">> := 0, + <<"failed.exception">> := 0, + <<"failed.no_result">> := 0, + <<"rate">> := _, + <<"rate_max">> := _, + <<"rate_last5m">> := _, + <<"outputs.total">> := 1, + <<"outputs.success">> := 1, + <<"outputs.failed">> := 0, + <<"outputs.failed.out_of_service">> := 0, + <<"outputs.failed.unknown">> := 0 + } } = jsx:decode(Rule1), %% we should receive a message on the "remote" broker, with specified topic ?assert( receive {deliver, RemoteTopic2, #message{payload = Payload2}} -> - ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]), + ct:pal("remote broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]), true; Msg -> ct:pal("Msg: ~p", [Msg]), diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl index c803e375a..811a5131c 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl @@ -28,7 +28,7 @@ , inc/4 , get/3 , get_rate/2 - , get_all_counters/2 + , get_counters/2 , create_metrics/3 , create_metrics/4 , clear_metrics/2 @@ -129,15 +129,15 @@ get(Name, Id, Metric) -> get_rate(Name, Id) -> gen_server:call(Name, {get_rate, Id}). --spec(get_all_counters(handler_name(), metric_id()) -> map()). -get_all_counters(Name, Id) -> +-spec(get_counters(handler_name(), metric_id()) -> map()). +get_counters(Name, Id) -> maps:map(fun(_Metric, Index) -> get(Name, Id, Index) end, get_indexes(Name, Id)). -spec(get_metrics(handler_name(), metric_id()) -> metrics()). get_metrics(Name, Id) -> - #{rate => get_rate(Name, Id), counters => get_all_counters(Name, Id)}. + #{rate => get_rate(Name, Id), counters => get_counters(Name, Id)}. -spec inc(handler_name(), metric_id(), atom()) -> ok. inc(Name, Id, Metric) -> @@ -145,7 +145,7 @@ inc(Name, Id, Metric) -> -spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok. inc(Name, Id, Metric, Val) -> - counters:add(get_ref(Name, Id), idx_metric(Name, Id,Metric), Val). + counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val). start_link(Name) -> gen_server:start_link({local, Name}, ?MODULE, Name, []). @@ -176,8 +176,8 @@ handle_call({create_metrics, Id, Metrics, RateMetrics}, _From, State#state{metric_ids = sets:add_element(Id, MIDs), rates = Rate1}}; _ -> - {reply, {error, metrics_to}, State} - end. + {reply, {error, not_super_set_of, {RateMetrics, Metrics}}, State} + end; handle_call({delete_metrics, Id}, _From, State = #state{metric_ids = MIDs, rates = Rates}) -> @@ -229,35 +229,39 @@ stop(Name) -> create_counters(_Name, _Id, []) -> error({create_counter_error, must_provide_a_list_of_metrics}); create_counters(Name, Id, Metrics) -> + %% backup the old counters + OlderCounters = maps:with(Metrics, get_counters(Name, Id)), + %% create the new counter Size = length(Metrics), Indexes = maps:from_list(lists:zip(Metrics, lists:seq(1, Size))), - Counters = get_counters(Name), + Counters = get_pterm(Name), CntrRef = counters:new(Size, [write_concurrency]), persistent_term:put(?CntrRef(Name), - Counters#{Id => #{ref => CntrRef, indexes => Indexes}}). + Counters#{Id => #{ref => CntrRef, indexes => Indexes}}), + %% restore the old counters + lists:foreach(fun({Metric, N}) -> + inc(Name, Id, Metric, N) + end, maps:to_list(OlderCounters)). delete_counters(Name, Id) -> - persistent_term:put(?CntrRef(Name), maps:remove(Id, get_counters(Name))). + persistent_term:put(?CntrRef(Name), maps:remove(Id, get_pterm(Name))). get_ref(Name, Id) -> - case maps:find(Id, get_counters(Name)) of + case maps:find(Id, get_pterm(Name)) of {ok, #{ref := Ref}} -> Ref; error -> not_found end. idx_metric(Name, Id, Metric) -> - case get_indexes(Name, Id) of - not_found -> not_found; - Indexes -> maps:get(Metric, Indexes, not_found) - end. + maps:get(Metric, get_indexes(Name, Id)). get_indexes(Name, Id) -> - case maps:find(Id, get_counters(Name)) of + case maps:find(Id, get_pterm(Name)) of {ok, #{indexes := Indexes}} -> Indexes; - error -> not_found + error -> #{} end. -get_counters(Name) -> +get_pterm(Name) -> persistent_term:get(?CntrRef(Name), #{}). calculate_rate(_CurrVal, undefined) -> diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl index 1fd4d9ca4..25c253a9a 100644 --- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl @@ -104,6 +104,35 @@ t_get_metrics_2(_) -> } when map_size(Rate) =:= 1, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). +t_recreate_metrics(_) -> + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a]), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), + ?assertMatch(#{ + rate := R = #{ + a := #{current := _, max := _, last5m := _} + }, + counters := C = #{ + a := 1 + } + } when map_size(R) == 1 andalso map_size(C) == 1, + emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), + %% we create the metrics again, to add some counters + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a, b, c]), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ?assertMatch(#{ + rate := R = #{ + a := #{current := _, max := _, last5m := _}, + b := #{current := _, max := _, last5m := _}, + c := #{current := _, max := _, last5m := _} + }, + counters := C = #{ + a := 1, b := 1, c := 1 + } + } when map_size(R) == 3 andalso map_size(C) == 3, + emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), + ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). + t_inc_matched(_) -> Metrics = ['rules.matched'], ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, Metrics), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 704c1a4d3..d7066ef87 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -205,7 +205,12 @@ query(InstId, Request, AfterQuery) -> {ok, #{mod := Mod, state := ResourceState, status := started}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe - Mod:on_query(InstId, Request, AfterQuery, ResourceState); + ok = emqx_plugin_libs_metrics:inc(resource_metrics, InstId, matched), + try Mod:on_query(InstId, Request, AfterQuery, ResourceState) + catch Err:Reason:ST -> + emqx_plugin_libs_metrics:inc(resource_metrics, InstId, exception), + erlang:raise(Err, Reason, ST) + end; {error, not_found} -> query_error(not_found, <<"the resource id not exists">>) end. @@ -346,9 +351,8 @@ filter_instances(Filter) -> [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)]. inc_metrics_funcs(InstId) -> - OnFailed = [{fun emqx_plugin_libs_metrics:inc/2, [resource_metrics, InstId, failed]}], - OnSucc = [ {fun emqx_plugin_libs_metrics:inc/2, [resource_metrics, InstId, matched]} - , {fun emqx_plugin_libs_metrics:inc/2, [resource_metrics, success]} + OnFailed = [{fun emqx_plugin_libs_metrics:inc/3, [resource_metrics, InstId, failed]}], + OnSucc = [ {fun emqx_plugin_libs_metrics:inc/3, [resource_metrics, InstId, success]} ], {OnSucc, OnFailed}. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 18a8c8580..74429fcc8 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -177,7 +177,7 @@ do_create(InstId, ResourceType, Config, Opts) -> case do_start(InstId, ResourceType, Config, Opts) of ok -> ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId, - [matched, success, failed], [matched]), + [matched, success, failed, exception], [matched]), {ok, force_lookup(InstId)}; Error -> Error diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 9de4d1614..99f9050a7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -71,6 +71,14 @@ -define(T_CALL, infinity). +%% NOTE: This order cannot be changed! This is to make the metric working during relup. +%% Append elements to this list to add new metrics. +-define(METRICS, ['matched', 'passed', 'failed', 'failed.exception', 'failed.no_result', + 'outputs.total', 'outputs.success', 'outputs.failed', 'outputs.failed.out_of_service', + 'outputs.failed.unknown']). + +-define(RATE_METRICS, ['matched']). + config_key_path() -> [rule_engine, rules]. @@ -162,10 +170,10 @@ get_rule(Id) -> load_hooks_for_rule(#{from := Topics}) -> lists:foreach(fun emqx_rule_events:load/1, Topics). -add_metrics_for_rule(#{id := Id}) -> - ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, Id). +add_metrics_for_rule(Id) -> + ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, Id, ?METRICS, ?RATE_METRICS). -clear_metrics_for_rule(#{id := Id}) -> +clear_metrics_for_rule(Id) -> ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id). unload_hooks_for_rule(#{id := Id, from := Topics}) -> @@ -243,7 +251,7 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) -> do_insert_rule(#{id := Id} = Rule) -> ok = load_hooks_for_rule(Rule), - ok = add_metrics_for_rule(Rule), + ok = add_metrics_for_rule(Id), true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}), ok. @@ -251,7 +259,7 @@ do_delete_rule(RuleId) -> case get_rule(RuleId) of {ok, Rule} -> ok = unload_hooks_for_rule(Rule), - ok = clear_metrics_for_rule(Rule), + ok = clear_metrics_for_rule(RuleId), true = ets:delete(?RULE_TAB, RuleId), ok; not_found -> ok diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 15f407ab1..e57fd5ecd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -43,6 +43,40 @@ {error, REASON} -> {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}} end). +-define(METRICS(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS, + O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5), + #{ + matched => MATCH, + passed => PASS, + failed => FAIL, + 'failed.exception' => FAIL_EX, + 'failed.no_result' => FAIL_NORES, + 'outputs.total' => O_TOTAL, + 'outputs.failed' => O_FAIL, + 'outputs.failed.out_of_service' => O_FAIL_OOS, + 'outputs.failed.unknown' => O_FAIL_UNKNOWN, + 'outputs.success' => O_SUCC, + rate => RATE, + rate_max => RATE_MAX, + rate_last5m => RATE_5 + }). +-define(metrics(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS, + O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5), + #{ + matched := MATCH, + passed := PASS, + failed := FAIL, + 'failed.exception' := FAIL_EX, + 'failed.no_result' := FAIL_NORES, + 'outputs.total' := O_TOTAL, + 'outputs.failed' := O_FAIL, + 'outputs.failed.out_of_service' := O_FAIL_OOS, + 'outputs.failed.unknown' := O_FAIL_UNKNOWN, + 'outputs.success' := O_SUCC, + rate := RATE, + rate_max := RATE_MAX, + rate_last5m := RATE_5 + }). namespace() -> "rule". @@ -268,17 +302,23 @@ printable_function_name(Mod, Func) -> list_to_binary(lists:concat([Mod,":",Func])). get_rule_metrics(Id) -> - Format = fun (Node, #{matched := Matched, - rate := Current, - rate_max := Max, - rate_last5m := Last5M - }) -> - #{ metrics => #{ - matched => Matched, - rate => Current, - rate_max => Max, - rate_last5m => Last5M - } + Format = fun (Node, #{ + counters := + #{matched := Matched, passed := Passed, failed := Failed, + 'failed.exception' := FailedEx, + 'failed.no_result' := FailedNoRes, + 'outputs.total' := OTotal, + 'outputs.failed' := OFailed, + 'outputs.failed.out_of_service' := OFailedOOS, + 'outputs.failed.unknown' := OFailedUnknown, + 'outputs.success' := OFailedSucc + }, + rate := + #{matched := + #{current := Current, max := Max, last5m := Last5M} + }}) -> + #{ metrics => ?METRICS(Matched, Passed, Failed, FailedEx, FailedNoRes, + OTotal, OFailed, OFailedOOS, OFailedUnknown, OFailedSucc, Current, Max, Last5M) , node => Node } end, @@ -286,13 +326,21 @@ get_rule_metrics(Id) -> || Node <- mria_mnesia:running_nodes()]. aggregate_metrics(AllMetrics) -> - InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0}, + InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), lists:foldl(fun - (#{metrics := #{matched := Match1, rate := Rate1, - rate_max := RateMax1, rate_last5m := Rate5m1}}, - #{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) -> - #{matched => Match1 + Match0, rate => Rate1 + Rate0, - rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0} + (#{metrics := ?metrics(Match1, Passed1, Failed1, FailedEx1, FailedNoRes1, + OTotal1, OFailed1, OFailedOOS1, OFailedUnknown1, OFailedSucc1, + Rate1, RateMax1, Rate5m1)}, + ?metrics(Match0, Passed0, Failed0, FailedEx0, FailedNoRes0, + OTotal0, OFailed0, OFailedOOS0, OFailedUnknown0, OFailedSucc0, + Rate0, RateMax0, Rate5m0)) -> + ?METRICS(Match1 + Match0, Passed1 + Passed0, Failed1 + Failed0, + FailedEx1 + FailedEx0, FailedNoRes1 + FailedNoRes0, + OTotal1 + OTotal0, OFailed1 + OFailed0, + OFailedOOS1 + OFailedOOS0, + OFailedUnknown1 + OFailedUnknown0, + OFailedSucc1 + OFailedSucc0, + Rate1 + Rate0, RateMax1 + RateMax0, Rate5m1 + Rate5m0) end, InitMetrics, AllMetrics). get_one_rule(AllRules, Id) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 2256dcdd4..e30a34b84 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -55,18 +55,23 @@ apply_rules([Rule = #{id := RuleID}|More], Input) -> catch %% ignore the errors if select or match failed _:{select_and_transform_error, Error} -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), ?SLOG(warning, #{msg => "SELECT_clause_exception", rule_id => RuleID, reason => Error}); _:{match_conditions_error, Error} -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), ?SLOG(warning, #{msg => "WHERE_clause_exception", rule_id => RuleID, reason => Error}); _:{select_and_collect_error, Error} -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), ?SLOG(warning, #{msg => "FOREACH_clause_exception", rule_id => RuleID, reason => Error}); _:{match_incase_error, Error} -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), ?SLOG(warning, #{msg => "INCASE_clause_exception", rule_id => RuleID, reason => Error}); Class:Error:StkTrace -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), ?SLOG(error, #{msg => "apply_rule_failed", rule_id => RuleID, exception => Class, @@ -81,6 +86,7 @@ apply_rule_discard_result(Rule, Input) -> ok. apply_rule(Rule = #{id := RuleID}, Input) -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, matched), clear_rule_payload(), do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). @@ -99,10 +105,16 @@ do_apply_rule(#{ case ?RAISE(match_conditions(Conditions, ColumnsAndSelected), {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> - ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), Collection2 = filter_collection(Input, InCase, DoEach, Collection), + case Collection2 of + [] -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'); + _ -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, passed) + end, {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]}; false -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} end; @@ -117,9 +129,10 @@ do_apply_rule(#{id := RuleId, case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)), {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> - ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, passed), {ok, handle_output_list(RuleId, Outputs, Selected, Input)}; false -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} end. @@ -235,23 +248,30 @@ handle_output_list(RuleId, Outputs, Selected, Envs) -> [handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs]. handle_output(RuleId, OutId, Selected, Envs) -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.total'), try - do_handle_output(OutId, Selected, Envs) + Result = do_handle_output(OutId, Selected, Envs), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.success'), + Result catch + throw:Reason -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.out_of_service'), + ?SLOG(error, #{msg => "output_failed", output => OutId, reason => Reason}); Err:Reason:ST -> - ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId), - Level = case Err of throw -> debug; _ -> error end, - ?SLOG(Level, #{msg => "output_failed", - output => OutId, - exception => Err, - reason => Reason, - stacktrace => ST - }) + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.unknown'), + ?SLOG(error, #{msg => "output_failed", output => OutId, exception => Err, + reason => Reason, stacktrace => ST}) end. do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) -> ?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}), - emqx_bridge:send_message(BridgeId, Selected); + case emqx_bridge:send_message(BridgeId, Selected) of + {error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped -> + throw(bridge_out_of_service); + Result -> Result + end; do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> Mod:Func(Selected, Envs, Args). diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index bb4e5b0ef..4aa517fef 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -41,7 +41,7 @@ test(#{sql := Sql, context := Context}) -> test_rule(Sql, Select, Context, EventTopics) -> RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]), - ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, RuleId), + ok = emqx_rule_engine:add_metrics_for_rule(RuleId), Rule = #{ id => RuleId, sql => Sql, @@ -62,7 +62,7 @@ test_rule(Sql, Select, Context, EventTopics) -> {ok, Data} -> {ok, flatten(Data)}; {error, nomatch} -> {error, nomatch} after - ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, RuleId) + ok = emqx_rule_engine:clear_metrics_for_rule(RuleId) end. get_selected_data(Selected, _Envs, _Args) -> From 72d55c8c0d6fbdbc7168f85919bee679f4821790 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 6 Jan 2022 15:46:32 +0800 Subject: [PATCH 5/7] fix(rules): improve the names of the metrics --- .../src/mqtt/emqx_connector_mqtt_mod.erl | 9 +++-- .../src/mqtt/emqx_connector_mqtt_msg.erl | 6 +-- .../test/emqx_connector_api_SUITE.erl | 32 +++++++-------- .../src/emqx_rule_api_schema.erl | 38 ++++++++++++++++-- .../emqx_rule_engine/src/emqx_rule_engine.erl | 16 ++++++-- .../src/emqx_rule_engine_api.erl | 40 +++++++++---------- .../src/emqx_rule_runtime.erl | 29 +++++++------- 7 files changed, 105 insertions(+), 65 deletions(-) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 2b2bad59d..5cc6c7796 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -165,7 +165,7 @@ handle_publish(Msg, undefined) -> ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" "_'ingress'_is_not_configured", message => Msg}); -handle_publish(Msg0, Vars) -> +handle_publish(#{properties := Props} = Msg0, Vars) -> Msg = format_msg_received(Msg0), ?SLOG(debug, #{msg => "publish_to_local_broker", message => Msg, vars => Vars}), @@ -174,7 +174,7 @@ handle_publish(Msg0, Vars) -> _ = erlang:apply(Mod, Func, [Msg | Args]); _ -> ok end, - maybe_publish_to_local_broker(Msg0, Vars). + maybe_publish_to_local_broker(Msg, Vars, Props). handle_disconnected(Reason, Parent) -> Parent ! {disconnected, self(), Reason}. @@ -195,14 +195,15 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) -> process_config(Config) -> maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config). -maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) -> +maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars, + Props) -> case maps:get(local_topic, Vars, undefined) of undefined -> ok; %% local topic is not set, discard it _ -> case emqx_topic:match(Topic, SubTopic) of true -> - _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)), + _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props)), ok; false -> ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched", diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 449837d51..e8e4580f4 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -20,7 +20,7 @@ , from_binary/1 , make_pub_vars/2 , to_remote_msg/2 - , to_broker_msg/2 + , to_broker_msg/3 , estimate_size/1 ]). @@ -78,9 +78,9 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> Msg#message{topic = topic(Mountpoint, Topic)}. %% published from remote node over a MQTT connection -to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, +to_broker_msg(#{dup := Dup} = MapMsg, #{local_topic := TopicToken, payload := PayloadToken, - local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> + local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}, Props) -> Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 821d02278..e7277083e 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -524,14 +524,14 @@ t_ingress_mqtt_bridge_with_rules(_) -> {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), #{ <<"id">> := RuleId , <<"metrics">> := #{ - <<"matched">> := 1, - <<"passed">> := 1, - <<"failed">> := 0, - <<"failed.exception">> := 0, - <<"failed.no_result">> := 0, - <<"rate">> := _, - <<"rate_max">> := _, - <<"rate_last5m">> := _, + <<"sql.matched">> := 1, + <<"sql.passed">> := 1, + <<"sql.failed">> := 0, + <<"sql.failed.exception">> := 0, + <<"sql.failed.no_result">> := 0, + <<"sql.matched.rate">> := _, + <<"sql.matched.rate.max">> := _, + <<"sql.matched.rate.last5m">> := _, <<"outputs.total">> := 1, <<"outputs.success">> := 1, <<"outputs.failed">> := 0, @@ -613,14 +613,14 @@ t_egress_mqtt_bridge_with_rules(_) -> {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), #{ <<"id">> := RuleId , <<"metrics">> := #{ - <<"matched">> := 1, - <<"passed">> := 1, - <<"failed">> := 0, - <<"failed.exception">> := 0, - <<"failed.no_result">> := 0, - <<"rate">> := _, - <<"rate_max">> := _, - <<"rate_last5m">> := _, + <<"sql.matched">> := 1, + <<"sql.passed">> := 1, + <<"sql.failed">> := 0, + <<"sql.failed.exception">> := 0, + <<"sql.failed.no_result">> := 0, + <<"sql.matched.rate">> := _, + <<"sql.matched.rate.max">> := _, + <<"sql.matched.rate.last5m">> := _, <<"outputs.total">> := 1, <<"outputs.success">> := 1, <<"outputs.failed">> := 0, diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index d992cdc07..7af10a342 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -81,11 +81,41 @@ fields("rule_test") -> ]; fields("metrics") -> - [ {"matched", sc(integer(), #{desc => "How much times this rule is matched"})} - , {"rate", sc(float(), #{desc => "The rate of matched, times/second"})} - , {"rate_max", sc(float(), #{desc => "The max rate of matched, times/second"})} - , {"rate_last5m", sc(float(), + [ {"sql.matched", sc(integer(), #{ + desc => "How much times the FROM clause of the SQL is matched." + })} + , {"sql.matched.rate", sc(float(), #{desc => "The rate of matched, times/second"})} + , {"sql.matched.rate.max", sc(float(), #{desc => "The max rate of matched, times/second"})} + , {"sql.matched.rate.last5m", sc(float(), #{desc => "The average rate of matched in last 5 mins, times/second"})} + , {"sql.passed", sc(integer(), #{desc => "How much times the SQL is passed"})} + , {"sql.failed", sc(integer(), #{desc => "How much times the SQL is failed"})} + , {"sql.failed.exception", sc(integer(), #{ + desc => "How much times the SQL is failed due to exceptions. " + "This may because of a crash when calling a SQL function, or " + "trying to do arithmetic operation on undefined variables" + })} + , {"sql.failed.unknown", sc(integer(), #{ + desc => "How much times the SQL is failed due to an unknown error." + })} + , {"outputs.total", sc(integer(), #{ + desc => "How much times the outputs are called by the rule. " + "This value may serveral times of 'sql.matched', depending on the " + "number of the outputs of the rule." + })} + , {"outputs.success", sc(integer(), #{ + desc => "How much times the rule success to call the outputs." + })} + , {"outputs.failed", sc(integer(), #{ + desc => "How much times the rule failed to call the outputs." + })} + , {"outputs.failed.out_of_service", sc(integer(), #{ + desc => "How much times the rule failed to call outputs due to the output is " + "out of service. For example, a bridge is disabled or stopped." + })} + , {"outputs.failed.unknown", sc(integer(), #{ + desc => "How much times the rule failed to call outputs due to to an unknown error." + })} ]; fields("node_metrics") -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 99f9050a7..8749fea13 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -73,11 +73,19 @@ %% NOTE: This order cannot be changed! This is to make the metric working during relup. %% Append elements to this list to add new metrics. --define(METRICS, ['matched', 'passed', 'failed', 'failed.exception', 'failed.no_result', - 'outputs.total', 'outputs.success', 'outputs.failed', 'outputs.failed.out_of_service', - 'outputs.failed.unknown']). +-define(METRICS, [ 'sql.matched' + , 'sql.passed' + , 'sql.failed' + , 'sql.failed.exception' + , 'sql.failed.no_result' + , 'outputs.total' + , 'outputs.success' + , 'outputs.failed' + , 'outputs.failed.out_of_service' + , 'outputs.failed.unknown' + ]). --define(RATE_METRICS, ['matched']). +-define(RATE_METRICS, ['sql.matched']). config_key_path() -> [rule_engine, rules]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index e57fd5ecd..b983747e9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -46,36 +46,36 @@ -define(METRICS(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS, O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5), #{ - matched => MATCH, - passed => PASS, - failed => FAIL, - 'failed.exception' => FAIL_EX, - 'failed.no_result' => FAIL_NORES, + 'sql.matched' => MATCH, + 'sql.passed' => PASS, + 'sql.failed' => FAIL, + 'sql.failed.exception' => FAIL_EX, + 'sql.failed.no_result' => FAIL_NORES, 'outputs.total' => O_TOTAL, 'outputs.failed' => O_FAIL, 'outputs.failed.out_of_service' => O_FAIL_OOS, 'outputs.failed.unknown' => O_FAIL_UNKNOWN, 'outputs.success' => O_SUCC, - rate => RATE, - rate_max => RATE_MAX, - rate_last5m => RATE_5 + 'sql.matched.rate' => RATE, + 'sql.matched.rate.max' => RATE_MAX, + 'sql.matched.rate.last5m' => RATE_5 }). -define(metrics(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS, O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5), #{ - matched := MATCH, - passed := PASS, - failed := FAIL, - 'failed.exception' := FAIL_EX, - 'failed.no_result' := FAIL_NORES, + 'sql.matched' := MATCH, + 'sql.passed' := PASS, + 'sql.failed' := FAIL, + 'sql.failed.exception' := FAIL_EX, + 'sql.failed.no_result' := FAIL_NORES, 'outputs.total' := O_TOTAL, 'outputs.failed' := O_FAIL, 'outputs.failed.out_of_service' := O_FAIL_OOS, 'outputs.failed.unknown' := O_FAIL_UNKNOWN, 'outputs.success' := O_SUCC, - rate := RATE, - rate_max := RATE_MAX, - rate_last5m := RATE_5 + 'sql.matched.rate' := RATE, + 'sql.matched.rate.max' := RATE_MAX, + 'sql.matched.rate.last5m' := RATE_5 }). namespace() -> "rule". @@ -304,9 +304,9 @@ printable_function_name(Mod, Func) -> get_rule_metrics(Id) -> Format = fun (Node, #{ counters := - #{matched := Matched, passed := Passed, failed := Failed, - 'failed.exception' := FailedEx, - 'failed.no_result' := FailedNoRes, + #{'sql.matched' := Matched, 'sql.passed' := Passed, 'sql.failed' := Failed, + 'sql.failed.exception' := FailedEx, + 'sql.failed.no_result' := FailedNoRes, 'outputs.total' := OTotal, 'outputs.failed' := OFailed, 'outputs.failed.out_of_service' := OFailedOOS, @@ -314,7 +314,7 @@ get_rule_metrics(Id) -> 'outputs.success' := OFailedSucc }, rate := - #{matched := + #{'sql.matched' := #{current := Current, max := Max, last5m := Last5M} }}) -> #{ metrics => ?METRICS(Matched, Passed, Failed, FailedEx, FailedNoRes, diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index e30a34b84..080a36511 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -55,23 +55,23 @@ apply_rules([Rule = #{id := RuleID}|More], Input) -> catch %% ignore the errors if select or match failed _:{select_and_transform_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(warning, #{msg => "SELECT_clause_exception", rule_id => RuleID, reason => Error}); _:{match_conditions_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(warning, #{msg => "WHERE_clause_exception", rule_id => RuleID, reason => Error}); _:{select_and_collect_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(warning, #{msg => "FOREACH_clause_exception", rule_id => RuleID, reason => Error}); _:{match_incase_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(warning, #{msg => "INCASE_clause_exception", rule_id => RuleID, reason => Error}); Class:Error:StkTrace -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(error, #{msg => "apply_rule_failed", rule_id => RuleID, exception => Class, @@ -86,7 +86,7 @@ apply_rule_discard_result(Rule, Input) -> ok. apply_rule(Rule = #{id := RuleID}, Input) -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, matched), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.matched'), clear_rule_payload(), do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). @@ -108,13 +108,13 @@ do_apply_rule(#{ Collection2 = filter_collection(Input, InCase, DoEach, Collection), case Collection2 of [] -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'); + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'); _ -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, passed) + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.passed') end, {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]}; false -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'), {error, nomatch} end; @@ -129,10 +129,10 @@ do_apply_rule(#{id := RuleId, case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)), {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, passed), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.passed'), {ok, handle_output_list(RuleId, Outputs, Selected, Input)}; false -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'), {error, nomatch} end. @@ -254,10 +254,10 @@ handle_output(RuleId, OutId, Selected, Envs) -> ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.success'), Result catch - throw:Reason -> + throw:out_of_service -> ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.out_of_service'), - ?SLOG(error, #{msg => "output_failed", output => OutId, reason => Reason}); + ?SLOG(warning, #{msg => "out_of_service", output => OutId}); Err:Reason:ST -> ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.unknown'), @@ -269,10 +269,11 @@ do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) -> ?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}), case emqx_bridge:send_message(BridgeId, Selected) of {error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped -> - throw(bridge_out_of_service); + throw(out_of_service); Result -> Result end; do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> + %% the function can also throw 'out_of_service' Mod:Func(Selected, Envs, Args). eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) -> From 7dcb9567e7588988ce976cf9338765646c56199d Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 7 Jan 2022 14:34:59 +0800 Subject: [PATCH 6/7] fix(bridge): get bridges from all nodes crash --- apps/emqx_bridge/src/emqx_bridge_api.erl | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 926bd3612..f1335b502 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -18,6 +18,7 @@ -behaviour(minirest_api). -include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). -import(hoconsc, [mk/2, array/1, enum/1]). @@ -371,8 +372,12 @@ zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) -> pick_bridges_by_id(Id, BridgesAllNodes) -> lists:foldl(fun(BridgesOneNode, Acc) -> - [BridgeInfo] = [Bridge || Bridge = #{id := Id0} <- BridgesOneNode, Id0 == Id], - [BridgeInfo | Acc] + case [Bridge || Bridge = #{id := Id0} <- BridgesOneNode, Id0 == Id] of + [BridgeInfo] -> [BridgeInfo | Acc]; + [] -> + ?SLOG(warning, #{msg => "bridge_inconsistent_in_cluster", bridge => Id}), + Acc + end end, [], BridgesAllNodes). format_bridge_info([FirstBridge | _] = Bridges) -> From 2a2a00e0ad34b2d30387538e2f4d3dce640af5ab Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 11 Jan 2022 14:44:08 +0800 Subject: [PATCH 7/7] fix(rule): remove rules from all nodes in the cluster --- apps/emqx_rule_engine/src/emqx_rule_engine.erl | 2 +- apps/emqx_rule_engine/src/emqx_rule_engine_api.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 8749fea13..60befa3ab 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -119,7 +119,7 @@ post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) -> load_rules() -> maps_foreach(fun({Id, Rule}) -> {ok, _} = create_rule(Rule#{id => bin(Id)}) - end, emqx_conf:get([rule_engine, rules], #{})). + end, emqx:get_config([rule_engine, rules], #{})). -spec create_rule(map()) -> {ok, rule()} | {error, term()}. create_rule(Params = #{id := RuleId}) when is_binary(RuleId) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index b983747e9..6f1da4e88 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -246,7 +246,7 @@ param_path_id() -> '/rules/:id'(delete, #{bindings := #{id := Id}}) -> ConfPath = emqx_rule_engine:config_key_path() ++ [Id], - case emqx:remove_config(ConfPath, #{}) of + case emqx_conf:remove(ConfPath, #{}) of {ok, _} -> {204}; {error, Reason} -> ?SLOG(error, #{msg => "delete_rule_failed",