Merge pull request #7775 from zmstone/0426-5.0-hint-metrics-merge-errors
fix: hint metrics merge errors
This commit is contained in:
commit
3524fb6994
|
@ -30,7 +30,8 @@
|
||||||
binary_string/1,
|
binary_string/1,
|
||||||
deep_convert/3,
|
deep_convert/3,
|
||||||
diff_maps/2,
|
diff_maps/2,
|
||||||
merge_with/3
|
merge_with/3,
|
||||||
|
best_effort_recursive_sum/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([config_key/0, config_key_path/0]).
|
-export_type([config_key/0, config_key_path/0]).
|
||||||
|
@ -242,10 +243,8 @@ merge_with(Combiner, Map1, Map2) when
|
||||||
)
|
)
|
||||||
end;
|
end;
|
||||||
merge_with(Combiner, Map1, Map2) ->
|
merge_with(Combiner, Map1, Map2) ->
|
||||||
error_with_info(
|
ErrorType = error_type_merge_intersect(Map1, Map2, Combiner),
|
||||||
error_type_merge_intersect(Map1, Map2, Combiner),
|
throw(#{maps_merge_error => ErrorType, args => [Map1, Map2]}).
|
||||||
[Combiner, Map1, Map2]
|
|
||||||
).
|
|
||||||
|
|
||||||
merge_with_t({K, V2, Iterator}, Map1, Map2, Combiner) ->
|
merge_with_t({K, V2, Iterator}, Map1, Map2, Combiner) ->
|
||||||
case Map1 of
|
case Map1 of
|
||||||
|
@ -261,12 +260,26 @@ merge_with_t(none, Result, _, _) ->
|
||||||
error_type_merge_intersect(M1, M2, Combiner) when is_function(Combiner, 3) ->
|
error_type_merge_intersect(M1, M2, Combiner) when is_function(Combiner, 3) ->
|
||||||
error_type_two_maps(M1, M2);
|
error_type_two_maps(M1, M2);
|
||||||
error_type_merge_intersect(_M1, _M2, _Combiner) ->
|
error_type_merge_intersect(_M1, _M2, _Combiner) ->
|
||||||
badarg.
|
badarg_combiner_function.
|
||||||
|
|
||||||
error_with_info(_, _) ->
|
|
||||||
{error_info, #{module => erl_stdlib_errors}}.
|
|
||||||
|
|
||||||
error_type_two_maps(M1, M2) when is_map(M1) ->
|
error_type_two_maps(M1, M2) when is_map(M1) ->
|
||||||
{badmap, M2};
|
{badmap, M2};
|
||||||
error_type_two_maps(M1, _M2) ->
|
error_type_two_maps(M1, _M2) ->
|
||||||
{badmap, M1}.
|
{badmap, M1}.
|
||||||
|
|
||||||
|
%% @doc Sum-merge map values.
|
||||||
|
%% For bad merges, ErrorLogger is called to log the key, and value in M2 is ignored.
|
||||||
|
best_effort_recursive_sum(M1, M2, ErrorLogger) ->
|
||||||
|
F =
|
||||||
|
fun(Key, V1, V2) ->
|
||||||
|
case {erlang:is_map(V1), erlang:is_map(V2)} of
|
||||||
|
{true, true} ->
|
||||||
|
best_effort_recursive_sum(V1, V2, ErrorLogger);
|
||||||
|
{false, false} when is_number(V1) andalso is_number(V2) ->
|
||||||
|
V1 + V2;
|
||||||
|
_ ->
|
||||||
|
ErrorLogger(#{failed_to_merge => Key}),
|
||||||
|
V1
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
merge_with(F, M1, M2).
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_map_lib_tests).
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
best_effort_recursive_sum_test_() ->
|
||||||
|
DummyLogger = fun(_) -> ok end,
|
||||||
|
[
|
||||||
|
?_assertEqual(
|
||||||
|
#{foo => 3},
|
||||||
|
emqx_map_lib:best_effort_recursive_sum(#{foo => 1}, #{foo => 2}, DummyLogger)
|
||||||
|
),
|
||||||
|
?_assertEqual(
|
||||||
|
#{foo => 3, bar => 6.0},
|
||||||
|
emqx_map_lib:best_effort_recursive_sum(
|
||||||
|
#{foo => 1, bar => 2.0}, #{foo => 2, bar => 4.0}, DummyLogger
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?_assertEqual(
|
||||||
|
#{foo => 1, bar => 2},
|
||||||
|
emqx_map_lib:best_effort_recursive_sum(#{foo => 1}, #{bar => 2}, DummyLogger)
|
||||||
|
),
|
||||||
|
?_assertEqual(
|
||||||
|
#{foo => #{bar => 42}},
|
||||||
|
emqx_map_lib:best_effort_recursive_sum(
|
||||||
|
#{foo => #{bar => 2}}, #{foo => #{bar => 40}}, DummyLogger
|
||||||
|
)
|
||||||
|
),
|
||||||
|
fun() ->
|
||||||
|
Self = self(),
|
||||||
|
Logger = fun(What) -> Self ! {log, What} end,
|
||||||
|
?assertEqual(
|
||||||
|
#{foo => 1, bar => 2},
|
||||||
|
emqx_map_lib:best_effort_recursive_sum(#{foo => 1, bar => 2}, #{bar => bar}, Logger)
|
||||||
|
),
|
||||||
|
receive
|
||||||
|
{log, Log} ->
|
||||||
|
?assertEqual(#{failed_to_merge => bar}, Log)
|
||||||
|
after 1000 -> error(timeout)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
].
|
|
@ -1006,15 +1006,9 @@ aggregate_status(AllStatus) ->
|
||||||
aggregate_metrics([]) ->
|
aggregate_metrics([]) ->
|
||||||
#{};
|
#{};
|
||||||
aggregate_metrics([HeadMetrics | AllMetrics]) ->
|
aggregate_metrics([HeadMetrics | AllMetrics]) ->
|
||||||
CombinerFun =
|
ErrorLogger = fun(Reason) -> ?SLOG(info, #{msg => "bad_metrics_value", error => Reason}) end,
|
||||||
fun ComFun(_Key, Val1, Val2) ->
|
|
||||||
case erlang:is_map(Val1) of
|
|
||||||
true -> emqx_map_lib:merge_with(ComFun, Val1, Val2);
|
|
||||||
false -> Val1 + Val2
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
Fun = fun(ElemMap, AccMap) ->
|
Fun = fun(ElemMap, AccMap) ->
|
||||||
emqx_map_lib:merge_with(CombinerFun, ElemMap, AccMap)
|
emqx_map_lib:best_effort_recursive_sum(AccMap, ElemMap, ErrorLogger)
|
||||||
end,
|
end,
|
||||||
lists:foldl(Fun, HeadMetrics, AllMetrics).
|
lists:foldl(Fun, HeadMetrics, AllMetrics).
|
||||||
|
|
||||||
|
|
|
@ -369,15 +369,9 @@ aggregate_status(AllStatus) ->
|
||||||
aggregate_metrics([]) ->
|
aggregate_metrics([]) ->
|
||||||
#{};
|
#{};
|
||||||
aggregate_metrics([HeadMetrics | AllMetrics]) ->
|
aggregate_metrics([HeadMetrics | AllMetrics]) ->
|
||||||
CombinerFun =
|
ErrorLogger = fun(Reason) -> ?SLOG(info, #{msg => "bad_metrics_value", error => Reason}) end,
|
||||||
fun ComFun(_Key, Val1, Val2) ->
|
|
||||||
case erlang:is_map(Val1) of
|
|
||||||
true -> emqx_map_lib:merge_with(ComFun, Val1, Val2);
|
|
||||||
false -> Val1 + Val2
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
Fun = fun(ElemMap, AccMap) ->
|
Fun = fun(ElemMap, AccMap) ->
|
||||||
emqx_map_lib:merge_with(CombinerFun, ElemMap, AccMap)
|
emqx_map_lib:best_effort_recursive_sum(AccMap, ElemMap, ErrorLogger)
|
||||||
end,
|
end,
|
||||||
lists:foldl(Fun, HeadMetrics, AllMetrics).
|
lists:foldl(Fun, HeadMetrics, AllMetrics).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue