Merge pull request #10032 from keynslug/fix/EMQX-9019/no-metrics-disconnected
fix(bridgeapi): anticipate node responses w/o metrics
This commit is contained in:
commit
7dce152ffd
|
@ -743,40 +743,39 @@ collect_metrics(Bridges) ->
|
|||
|
||||
aggregate_metrics(AllMetrics) ->
|
||||
InitMetrics = ?EMPTY_METRICS,
|
||||
lists:foldl(
|
||||
fun(
|
||||
#{
|
||||
metrics := ?metrics(
|
||||
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
|
||||
)
|
||||
},
|
||||
?metrics(
|
||||
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
|
||||
)
|
||||
) ->
|
||||
?METRICS(
|
||||
M1 + N1,
|
||||
M2 + N2,
|
||||
M3 + N3,
|
||||
M4 + N4,
|
||||
M5 + N5,
|
||||
M6 + N6,
|
||||
M7 + N7,
|
||||
M8 + N8,
|
||||
M9 + N9,
|
||||
M10 + N10,
|
||||
M11 + N11,
|
||||
M12 + N12,
|
||||
M13 + N13,
|
||||
M14 + N14,
|
||||
M15 + N15,
|
||||
M16 + N16,
|
||||
M17 + N17
|
||||
)
|
||||
end,
|
||||
InitMetrics,
|
||||
AllMetrics
|
||||
).
|
||||
lists:foldl(fun aggregate_metrics/2, InitMetrics, AllMetrics).
|
||||
|
||||
aggregate_metrics(
|
||||
#{
|
||||
metrics := ?metrics(
|
||||
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
|
||||
)
|
||||
},
|
||||
?metrics(
|
||||
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
|
||||
)
|
||||
) ->
|
||||
?METRICS(
|
||||
M1 + N1,
|
||||
M2 + N2,
|
||||
M3 + N3,
|
||||
M4 + N4,
|
||||
M5 + N5,
|
||||
M6 + N6,
|
||||
M7 + N7,
|
||||
M8 + N8,
|
||||
M9 + N9,
|
||||
M10 + N10,
|
||||
M11 + N11,
|
||||
M12 + N12,
|
||||
M13 + N13,
|
||||
M14 + N14,
|
||||
M15 + N15,
|
||||
M16 + N16,
|
||||
M17 + N17
|
||||
);
|
||||
aggregate_metrics(#{}, Metrics) ->
|
||||
Metrics.
|
||||
|
||||
format_resp(Data) ->
|
||||
format_resp(Data, node()).
|
||||
|
@ -786,18 +785,26 @@ format_resp(
|
|||
type := Type,
|
||||
name := BridgeName,
|
||||
raw_config := RawConf,
|
||||
resource_data := #{status := Status, metrics := Metrics}
|
||||
resource_data := ResourceData
|
||||
},
|
||||
Node
|
||||
) ->
|
||||
RawConfFull = fill_defaults(Type, RawConf),
|
||||
redact(RawConfFull#{
|
||||
type => Type,
|
||||
name => maps:get(<<"name">>, RawConf, BridgeName),
|
||||
node => Node,
|
||||
status => Status,
|
||||
metrics => format_metrics(Metrics)
|
||||
}).
|
||||
redact(
|
||||
maps:merge(
|
||||
RawConfFull#{
|
||||
type => Type,
|
||||
name => maps:get(<<"name">>, RawConf, BridgeName),
|
||||
node => Node
|
||||
},
|
||||
format_resource_data(ResourceData)
|
||||
)
|
||||
).
|
||||
|
||||
format_resource_data(#{status := Status, metrics := Metrics}) ->
|
||||
#{status => Status, metrics => format_metrics(Metrics)};
|
||||
format_resource_data(#{status := Status}) ->
|
||||
#{status => Status}.
|
||||
|
||||
format_metrics(#{
|
||||
counters := #{
|
||||
|
|
|
@ -140,17 +140,13 @@ stop_http_server(Sock, Acceptor) ->
|
|||
gen_tcp:close(Sock).
|
||||
|
||||
listen_on_random_port() ->
|
||||
Min = 1024,
|
||||
Max = 65000,
|
||||
rand:seed(exsplus, erlang:timestamp()),
|
||||
Port = rand:uniform(Max - Min) + Min,
|
||||
case
|
||||
gen_tcp:listen(Port, [
|
||||
binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}
|
||||
])
|
||||
of
|
||||
{ok, Sock} -> {Port, Sock};
|
||||
{error, eaddrinuse} -> listen_on_random_port()
|
||||
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],
|
||||
case gen_tcp:listen(0, SockOpts) of
|
||||
{ok, Sock} ->
|
||||
{ok, Port} = inet:port(Sock),
|
||||
{Port, Sock};
|
||||
{error, Reason} when Reason /= eaddrinuse ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
accept_loop(Sock, HandleFun, Parent) ->
|
||||
|
@ -543,7 +539,9 @@ do_start_stop_bridges(Type, Config) ->
|
|||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||
|
||||
%% Create broken bridge
|
||||
BadServer = <<"nohost">>,
|
||||
{ListenPort, Sock} = listen_on_random_port(),
|
||||
%% Connecting to this endpoint should always timeout
|
||||
BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])),
|
||||
BadName = <<"bad_", (atom_to_binary(Type))/binary>>,
|
||||
{ok, 201, BadBridge1} = request(
|
||||
post,
|
||||
|
@ -555,11 +553,15 @@ do_start_stop_bridges(Type, Config) ->
|
|||
<<"name">> := BadName,
|
||||
<<"enable">> := true,
|
||||
<<"server">> := BadServer,
|
||||
<<"status">> := <<"disconnected">>,
|
||||
<<"status">> := <<"connecting">>,
|
||||
<<"node_status">> := [_ | _]
|
||||
} = jsx:decode(BadBridge1),
|
||||
BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName),
|
||||
{ok, 500, _} = request(post, operation_path(Type, start, BadBridgeID), <<"">>),
|
||||
?assertMatch(
|
||||
{ok, SC, _} when SC == 500 orelse SC == 503,
|
||||
request(post, operation_path(Type, start, BadBridgeID), <<"">>)
|
||||
),
|
||||
ok = gen_tcp:close(Sock),
|
||||
ok.
|
||||
|
||||
t_enable_disable_bridges(Config) ->
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
When the resource manager is busy trying to establish a connection with the remote, the resource might yet lack any metrics information. Prior to this fix, the `bridges/` API handler crashed in such circumstances.
|
|
@ -0,0 +1 @@
|
|||
当资源管理器忙于尝试与远程建立连接时,资源可能还缺少任何度量信息。 在此修复之前,`bridges/' API 处理程序在这种情况下崩溃。
|
Loading…
Reference in New Issue