fix(emqx_bridge): propagate connection error to resource status

This commit is contained in:
Stefan Strigler 2023-03-15 14:50:40 +01:00
parent fd23800370
commit 53825b9aba
7 changed files with 152 additions and 85 deletions

View File

@ -54,6 +54,17 @@ emqx_bridge_schema {
}
}
desc_status_reason {
desc {
en: "This is the reason given in case a bridge is failing to connect."
zh: "桥接连接失败的原因。"
}
label: {
en: "Failure reason"
zh: "失败原因"
}
}
desc_node_status {
desc {
en: """The status of the bridge for each node.

View File

@ -748,7 +748,7 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
format_bridge_info_with_metrics([FirstBridge | _] = Bridges) ->
Res = maps:remove(node, FirstBridge),
NodeStatus = collect_status(Bridges),
NodeStatus = node_status(Bridges),
NodeMetrics = collect_metrics(Bridges),
redact(Res#{
status => aggregate_status(NodeStatus),
@ -765,8 +765,8 @@ format_bridge_metrics(Bridges) ->
Res = format_bridge_info_with_metrics(Bridges),
maps:with([metrics, node_metrics], Res).
collect_status(Bridges) ->
[maps:with([node, status], B) || B <- Bridges].
node_status(Bridges) ->
[maps:with([node, status, status_reason], B) || B <- Bridges].
aggregate_status(AllStatus) ->
Head = fun([A | _]) -> A end,
@ -837,52 +837,63 @@ format_resource(
)
).
format_resource_data(#{status := Status, metrics := Metrics}) ->
#{status => Status, metrics => format_metrics(Metrics)};
format_resource_data(#{status := Status}) ->
#{status => Status}.
format_resource_data(ResData) ->
maps:fold(fun format_resource_data/3, #{}, maps:with([status, metrics, error], ResData)).
format_metrics(#{
counters := #{
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.expired' := DroppedExpired,
'dropped.queue_full' := DroppedQueueFull,
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
'retried' := Retried,
'late_reply' := LateReply,
'failed' := SentFailed,
'success' := SentSucc,
'received' := Rcvd
format_resource_data(error, undefined, Result) ->
Result;
format_resource_data(error, Error, Result) ->
Result#{status_reason => emqx_misc:readable_error_msg(Error)};
format_resource_data(
metrics,
#{
counters := #{
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.expired' := DroppedExpired,
'dropped.queue_full' := DroppedQueueFull,
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
'retried' := Retried,
'late_reply' := LateReply,
'failed' := SentFailed,
'success' := SentSucc,
'received' := Rcvd
},
gauges := Gauges,
rate := #{
matched := #{current := Rate, last5m := Rate5m, max := RateMax}
}
},
gauges := Gauges,
rate := #{
matched := #{current := Rate, last5m := Rate5m, max := RateMax}
}
}) ->
Result
) ->
Queued = maps:get('queuing', Gauges, 0),
SentInflight = maps:get('inflight', Gauges, 0),
?METRICS(
Dropped,
DroppedOther,
DroppedExpired,
DroppedQueueFull,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Queued,
Retried,
LateReply,
SentFailed,
SentInflight,
SentSucc,
Rate,
Rate5m,
RateMax,
Rcvd
).
Result#{
metrics =>
?METRICS(
Dropped,
DroppedOther,
DroppedExpired,
DroppedQueueFull,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Queued,
Retried,
LateReply,
SentFailed,
SentInflight,
SentSucc,
Rate,
Rate5m,
RateMax,
Rcvd
)
};
format_resource_data(K, V, Result) ->
Result#{K => V}.
fill_defaults(Type, RawConf) ->
PackedConf = pack_bridge_conf(Type, RawConf),
@ -924,6 +935,7 @@ filter_out_request_body(Conf) ->
<<"type">>,
<<"name">>,
<<"status">>,
<<"error">>,
<<"node_status">>,
<<"node_metrics">>,
<<"metrics">>,

View File

@ -106,6 +106,12 @@ common_bridge_fields() ->
status_fields() ->
[
{"status", mk(status(), #{desc => ?DESC("desc_status")})},
{"status_reason",
mk(binary(), #{
required => false,
desc => ?DESC("desc_status_reason"),
example => <<"Connection refused">>
})},
{"node_status",
mk(
hoconsc:array(ref(?MODULE, "node_status")),
@ -190,7 +196,13 @@ fields("node_metrics") ->
fields("node_status") ->
[
node_name(),
{"status", mk(status(), #{})}
{"status", mk(status(), #{})},
{"status_reason",
mk(binary(), #{
required => false,
desc => ?DESC("desc_status_reason"),
example => <<"Connection refused">>
})}
].
desc(bridges) ->

View File

@ -23,7 +23,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"bridges: {}">>).
-define(BRIDGE_TYPE, <<"webhook">>).
-define(BRIDGE_TYPE_HTTP, <<"webhook">>).
-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
-define(URL(PORT, PATH),
list_to_binary(
@ -48,7 +48,7 @@
}).
-define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)).
-define(HTTP_BRIDGE(URL, TYPE, NAME), ?BRIDGE(NAME, TYPE)#{
-define(HTTP_BRIDGE(URL, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_HTTP)#{
<<"url">> => URL,
<<"local_topic">> => <<"emqx_webhook/#">>,
<<"method">> => <<"post">>,
@ -57,6 +57,7 @@
<<"content-type">> => <<"application/json">>
}
}).
-define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -206,12 +207,12 @@ t_http_crud_apis(Config) ->
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
%ct:pal("---bridge: ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
@ -219,7 +220,7 @@ t_http_crud_apis(Config) ->
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
%% send an message to emqx and the message should be forwarded to the HTTP server
Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
@ -243,11 +244,11 @@ t_http_crud_apis(Config) ->
{ok, 200, Bridge2} = request(
put,
uri(["bridges", BridgeID]),
?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL2, Name)
),
?assertMatch(
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
@ -262,7 +263,7 @@ t_http_crud_apis(Config) ->
?assertMatch(
[
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
@ -279,7 +280,7 @@ t_http_crud_apis(Config) ->
{ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
@ -311,7 +312,7 @@ t_http_crud_apis(Config) ->
{ok, 404, ErrMsg2} = request(
put,
uri(["bridges", BridgeID]),
?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL2, Name)
),
?assertMatch(
#{
@ -340,6 +341,34 @@ t_http_crud_apis(Config) ->
},
emqx_json:decode(ErrMsg3, [return_maps])
),
%% Create non working bridge
BrokenURL = ?URL(Port + 1, "/foo"),
{ok, 201, BrokenBridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(BrokenURL, Name)
),
#{
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"disconnected">>,
<<"status_reason">> := <<"Connection refused">>,
<<"node_status">> := [
#{<<"status">> := <<"disconnected">>, <<"status_reason">> := <<"Connection refused">>}
| _
],
<<"url">> := BrokenURL
} = emqx_json:decode(BrokenBridge, [return_maps]),
{ok, 200, FixedBridgeResponse} = request(put, uri(["bridges", BridgeID]), ?HTTP_BRIDGE(URL1)),
#{
<<"status">> := <<"connected">>,
<<"node_status">> := [FixedNodeStatus = #{<<"status">> := <<"connected">>} | _]
} = FixedBridge = emqx_json:decode(FixedBridgeResponse, [return_maps]),
?assert(not maps:is_key(<<"status_reason">>, FixedBridge)),
?assert(not maps:is_key(<<"status_reason">>, FixedNodeStatus)),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
ok.
t_http_bridges_local_topic(Config) ->
@ -356,16 +385,16 @@ t_http_bridges_local_topic(Config) ->
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name1)
?HTTP_BRIDGE(URL1, Name1)
),
%% and we create another one without local_topic
{ok, 201, _} = request(
post,
uri(["bridges"]),
maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name2))
maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, Name2))
),
BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name1),
BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name2),
BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name1),
BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name2),
%% Send an message to emqx and the message should be forwarded to the HTTP server.
%% This is to verify we can have 2 bridges with and without local_topic fields
%% at the same time.
@ -400,11 +429,11 @@ t_check_dependent_actions_on_delete(Config) ->
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = <<"t_http_crud_apis">>,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
{ok, 201, Rule} = request(
post,
@ -438,11 +467,11 @@ t_cascade_delete_actions(Config) ->
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = <<"t_http_crud_apis">>,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
{ok, 201, Rule} = request(
post,
@ -472,7 +501,7 @@ t_cascade_delete_actions(Config) ->
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
{ok, 201, _} = request(
post,
@ -496,9 +525,9 @@ t_broken_bpapi_vsn(Config) ->
{ok, 201, _Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
%% still works since we redirect to 'restart'
{ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>),
{ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>),
@ -511,9 +540,9 @@ t_old_bpapi_vsn(Config) ->
{ok, 201, _Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
{ok, 204, <<>>} = request(post, operation_path(cluster, stop, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(node, stop, BridgeID), <<"">>),
%% still works since we redirect to 'restart'
@ -551,18 +580,18 @@ do_start_stop_bridges(Type, Config) ->
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
%ct:pal("the bridge ==== ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
%% stop it
{ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
@ -633,18 +662,18 @@ t_enable_disable_bridges(Config) ->
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
%ct:pal("the bridge ==== ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
%% disable it
{ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
@ -690,18 +719,18 @@ t_reset_bridges(Config) ->
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
%ct:pal("the bridge ==== ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
{ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []),
%% delete the bridge
@ -748,20 +777,20 @@ t_bridges_probe(Config) ->
{ok, 204, <<>>} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
?HTTP_BRIDGE(URL)
),
%% second time with same name is ok since no real bridge created
{ok, 204, <<>>} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
?HTTP_BRIDGE(URL)
),
{ok, 400, NxDomain} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>, ?BRIDGE_TYPE, ?BRIDGE_NAME)
?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>)
),
?assertMatch(
#{
@ -882,12 +911,12 @@ t_metrics(Config) ->
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
%ct:pal("---bridge: ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
@ -895,7 +924,7 @@ t_metrics(Config) ->
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
%% check for empty bridge metrics
{ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
@ -963,7 +992,7 @@ t_inconsistent_webhook_request_timeouts(Config) ->
Name = ?BRIDGE_NAME,
BadBridgeParams =
emqx_map_lib:deep_merge(
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name),
?HTTP_BRIDGE(URL1, Name),
#{
<<"request_timeout">> => <<"1s">>,
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}

View File

@ -41,6 +41,7 @@
callback_mode := callback_mode(),
query_mode := query_mode(),
config := resource_config(),
error := term(),
state := resource_state(),
status := resource_status(),
metrics => emqx_metrics_worker:metrics()

View File

@ -666,6 +666,7 @@ maybe_reply(Actions, From, Reply) ->
data_record_to_external_map(Data) ->
#{
id => Data#data.id,
error => Data#data.error,
mod => Data#data.mod,
callback_mode => Data#data.callback_mode,
query_mode => Data#data.query_mode,

View File

@ -0,0 +1 @@
Fix `bridges` API to report error conditions for a failing bridge as `status_reason`.