Merge pull request #10145 from sstrigler/EMQX-8930-fix-reporting-errors-in-failing-bridges

fix reporting errors in failing bridges
This commit is contained in:
Stefan Strigler 2023-03-21 16:52:10 +01:00 committed by GitHub
commit e2b372f7ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 264 additions and 147 deletions

View File

@ -545,10 +545,23 @@ readable_error_msg(Error) ->
{ok, Msg} ->
Msg;
false ->
iolist_to_binary(io_lib:format("~0p", [Error]))
to_hr_error(Error)
end
end.
to_hr_error(nxdomain) ->
<<"Could not resolve host">>;
to_hr_error(econnrefused) ->
<<"Connection refused">>;
to_hr_error({unauthorized_client, _}) ->
<<"Unauthorized client">>;
to_hr_error({not_authorized, _}) ->
<<"Not authorized">>;
to_hr_error({malformed_username_or_password, _}) ->
<<"Bad username or password">>;
to_hr_error(Error) ->
iolist_to_binary(io_lib:format("~0p", [Error])).
try_to_existing_atom(Convert, Data, Encoding) ->
try Convert(Data, Encoding) of
Atom ->

View File

@ -54,6 +54,17 @@ emqx_bridge_schema {
}
}
desc_status_reason {
desc {
en: "This is the reason given in case a bridge is failing to connect."
zh: "桥接连接失败的原因。"
}
label: {
en: "Failure reason"
zh: "失败原因"
}
}
desc_node_status {
desc {
en: """The status of the bridge for each node.

View File

@ -46,18 +46,33 @@
-export([lookup_from_local_node/2]).
-define(BAD_REQUEST(Reason), {400, error_msg('BAD_REQUEST', Reason)}).
%% [TODO] Move those to a commonly shared header file
-define(ERROR_MSG(CODE, REASON), #{code => CODE, message => emqx_misc:readable_error_msg(REASON)}).
-define(OK(CONTENT), {200, CONTENT}).
-define(NO_CONTENT, 204).
-define(BAD_REQUEST(CODE, REASON), {400, ?ERROR_MSG(CODE, REASON)}).
-define(BAD_REQUEST(REASON), ?BAD_REQUEST('BAD_REQUEST', REASON)).
-define(NOT_FOUND(REASON), {404, ?ERROR_MSG('NOT_FOUND', REASON)}).
-define(INTERNAL_ERROR(REASON), {500, ?ERROR_MSG('INTERNAL_ERROR', REASON)}).
-define(NOT_IMPLEMENTED, 501).
-define(SERVICE_UNAVAILABLE(REASON), {503, ?ERROR_MSG('SERVICE_UNAVAILABLE', REASON)}).
%% End TODO
-define(BRIDGE_NOT_ENABLED,
?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>)
).
-define(NOT_FOUND(Reason), {404, error_msg('NOT_FOUND', Reason)}).
-define(BRIDGE_NOT_FOUND(BridgeType, BridgeName),
-define(BRIDGE_NOT_FOUND(BRIDGE_TYPE, BRIDGE_NAME),
?NOT_FOUND(
<<"Bridge lookup failed: bridge named '", BridgeName/binary, "' of type ",
(atom_to_binary(BridgeType))/binary, " does not exist.">>
<<"Bridge lookup failed: bridge named '", BRIDGE_NAME/binary, "' of type ",
(atom_to_binary(BRIDGE_TYPE))/binary, " does not exist.">>
)
).
@ -480,7 +495,7 @@ schema("/bridges_probe") ->
'/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)};
?BAD_REQUEST('ALREADY_EXISTS', <<"bridge already exists">>);
{error, not_found} ->
Conf = filter_out_request_body(Conf0),
{ok, _} = emqx_bridge:create(BridgeType, BridgeName, Conf),
@ -495,9 +510,9 @@ schema("/bridges_probe") ->
format_resource(Data, Node)
|| {Node, Bridges} <- lists:zip(Nodes, NodeBridges), Data <- Bridges
],
{200, zip_bridges([AllBridges])};
?OK(zip_bridges([AllBridges]));
{error, Reason} ->
{500, error_msg('INTERNAL_ERROR', Reason)}
?INTERNAL_ERROR(Reason)
end.
'/bridges/:id'(get, #{bindings := #{id := Id}}) ->
@ -529,16 +544,16 @@ schema("/bridges_probe") ->
end,
case emqx_bridge:check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActs) of
{ok, _} ->
204;
?NO_CONTENT;
{error, {rules_deps_on_this_bridge, RuleIds}} ->
?BAD_REQUEST(
{<<"Cannot delete bridge while active rules are defined for this bridge">>,
RuleIds}
);
{error, timeout} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, Reason} ->
{500, error_msg('INTERNAL_ERROR', Reason)}
?INTERNAL_ERROR(Reason)
end;
{error, not_found} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
@ -555,7 +570,7 @@ schema("/bridges_probe") ->
ok = emqx_bridge_resource:reset_metrics(
emqx_bridge_resource:resource_id(BridgeType, BridgeName)
),
{204}
?NO_CONTENT
end
).
@ -566,9 +581,9 @@ schema("/bridges_probe") ->
Params1 = maybe_deobfuscate_bridge_probe(Params),
case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of
ok ->
204;
?NO_CONTENT;
{error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
{400, error_msg('TEST_FAILED', to_hr_reason(Reason))}
?BAD_REQUEST('TEST_FAILED', Reason)
end;
BadRequest ->
BadRequest
@ -602,7 +617,7 @@ do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
{ok, [{error, not_found} | _]} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
{error, Reason} ->
{500, error_msg('INTERNAL_ERROR', Reason)}
?INTERNAL_ERROR(Reason)
end.
lookup_from_local_node(BridgeType, BridgeName) ->
@ -620,15 +635,15 @@ lookup_from_local_node(BridgeType, BridgeName) ->
OperFunc ->
case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of
{ok, _} ->
204;
?NO_CONTENT;
{error, {pre_config_update, _, bridge_not_found}} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
{error, {_, _, timeout}} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, timeout} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, Reason} ->
{500, error_msg('INTERNAL_ERROR', Reason)}
?INTERNAL_ERROR(Reason)
end
end
).
@ -748,7 +763,7 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
format_bridge_info_with_metrics([FirstBridge | _] = Bridges) ->
Res = maps:remove(node, FirstBridge),
NodeStatus = collect_status(Bridges),
NodeStatus = node_status(Bridges),
NodeMetrics = collect_metrics(Bridges),
redact(Res#{
status => aggregate_status(NodeStatus),
@ -765,8 +780,8 @@ format_bridge_metrics(Bridges) ->
Res = format_bridge_info_with_metrics(Bridges),
maps:with([metrics, node_metrics], Res).
collect_status(Bridges) ->
[maps:with([node, status], B) || B <- Bridges].
node_status(Bridges) ->
[maps:with([node, status, status_reason], B) || B <- Bridges].
aggregate_status(AllStatus) ->
Head = fun([A | _]) -> A end,
@ -837,12 +852,16 @@ format_resource(
)
).
format_resource_data(#{status := Status, metrics := Metrics}) ->
#{status => Status, metrics => format_metrics(Metrics)};
format_resource_data(#{status := Status}) ->
#{status => Status}.
format_resource_data(ResData) ->
maps:fold(fun format_resource_data/3, #{}, maps:with([status, metrics, error], ResData)).
format_metrics(#{
format_resource_data(error, undefined, Result) ->
Result;
format_resource_data(error, Error, Result) ->
Result#{status_reason => emqx_misc:readable_error_msg(Error)};
format_resource_data(
metrics,
#{
counters := #{
'dropped' := Dropped,
'dropped.other' := DroppedOther,
@ -861,9 +880,13 @@ format_metrics(#{
rate := #{
matched := #{current := Rate, last5m := Rate5m, max := RateMax}
}
}) ->
},
Result
) ->
Queued = maps:get('queuing', Gauges, 0),
SentInflight = maps:get('inflight', Gauges, 0),
Result#{
metrics =>
?METRICS(
Dropped,
DroppedOther,
@ -882,7 +905,10 @@ format_metrics(#{
Rate5m,
RateMax,
Rcvd
).
)
};
format_resource_data(K, V, Result) ->
Result#{K => V}.
fill_defaults(Type, RawConf) ->
PackedConf = pack_bridge_conf(Type, RawConf),
@ -924,6 +950,7 @@ filter_out_request_body(Conf) ->
<<"type">>,
<<"name">>,
<<"status">>,
<<"error">>,
<<"node_status">>,
<<"node_metrics">>,
<<"metrics">>,
@ -931,9 +958,6 @@ filter_out_request_body(Conf) ->
],
maps:without(ExtraConfs, Conf).
error_msg(Code, Msg) ->
#{code => Code, message => emqx_misc:readable_error_msg(Msg)}.
bin(S) when is_list(S) ->
list_to_binary(S);
bin(S) when is_atom(S) ->
@ -944,30 +968,31 @@ bin(S) when is_binary(S) ->
call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of
Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok ->
204;
?NO_CONTENT;
{error, not_implemented} ->
%% Should only happen if we call `start` on a node that is
%% still on an older bpapi version that doesn't support it.
maybe_try_restart(NodeOrAll, OperFunc, Args);
{error, timeout} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
?SERVICE_UNAVAILABLE(<<"Request timeout">>);
{error, {start_pool_failed, Name, Reason}} ->
{503,
error_msg(
'SERVICE_UNAVAILABLE',
bin(
io_lib:format(
"failed to start ~p pool for reason ~p",
[Name, Reason]
)
)
)};
?SERVICE_UNAVAILABLE(
bin(io_lib:format("Failed to start ~p pool for reason ~p", [Name, Reason]))
);
{error, not_found} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
?SLOG(warning, #{
msg => "bridge_inconsistent_in_cluster_for_call_operation",
reason => not_found,
type => BridgeType,
name => BridgeName,
bridge => BridgeId
}),
?SERVICE_UNAVAILABLE(<<"Bridge not found on remote node: ", BridgeId/binary>>);
{error, {node_not_found, Node}} ->
?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>);
{error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
?BAD_REQUEST(to_hr_reason(Reason))
?BAD_REQUEST(Reason)
end.
maybe_try_restart(all, start_bridges_to_all_nodes, Args) ->
@ -975,7 +1000,7 @@ maybe_try_restart(all, start_bridges_to_all_nodes, Args) ->
maybe_try_restart(Node, start_bridge_to_node, Args) ->
call_operation(Node, restart_bridge_to_node, Args);
maybe_try_restart(_, _, _) ->
501.
?NOT_IMPLEMENTED.
do_bpapi_call(all, Call, Args) ->
maybe_unwrap(
@ -1006,19 +1031,6 @@ supported_versions(start_bridge_to_node) -> [2, 3];
supported_versions(start_bridges_to_all_nodes) -> [2, 3];
supported_versions(_Call) -> [1, 2, 3].
to_hr_reason(nxdomain) ->
<<"Host not found">>;
to_hr_reason(econnrefused) ->
<<"Connection refused">>;
to_hr_reason({unauthorized_client, _}) ->
<<"Unauthorized client">>;
to_hr_reason({not_authorized, _}) ->
<<"Not authorized">>;
to_hr_reason({malformed_username_or_password, _}) ->
<<"Malformed username or password">>;
to_hr_reason(Reason) ->
Reason.
redact(Term) ->
emqx_misc:redact(Term).

View File

@ -106,6 +106,12 @@ common_bridge_fields() ->
status_fields() ->
[
{"status", mk(status(), #{desc => ?DESC("desc_status")})},
{"status_reason",
mk(binary(), #{
required => false,
desc => ?DESC("desc_status_reason"),
example => <<"Connection refused">>
})},
{"node_status",
mk(
hoconsc:array(ref(?MODULE, "node_status")),
@ -190,7 +196,13 @@ fields("node_metrics") ->
fields("node_status") ->
[
node_name(),
{"status", mk(status(), #{})}
{"status", mk(status(), #{})},
{"status_reason",
mk(binary(), #{
required => false,
desc => ?DESC("desc_status_reason"),
example => <<"Connection refused">>
})}
].
desc(bridges) ->

View File

@ -23,7 +23,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"bridges: {}">>).
-define(BRIDGE_TYPE, <<"webhook">>).
-define(BRIDGE_TYPE_HTTP, <<"webhook">>).
-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
-define(URL(PORT, PATH),
list_to_binary(
@ -48,7 +48,7 @@
}).
-define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)).
-define(HTTP_BRIDGE(URL, TYPE, NAME), ?BRIDGE(NAME, TYPE)#{
-define(HTTP_BRIDGE(URL, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_HTTP)#{
<<"url">> => URL,
<<"local_topic">> => <<"emqx_webhook/#">>,
<<"method">> => <<"post">>,
@ -57,6 +57,7 @@
<<"content-type">> => <<"application/json">>
}
}).
-define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -97,6 +98,20 @@ init_per_testcase(t_old_bpapi_vsn, Config) ->
meck:expect(emqx_bpapi, supported_version, 1, 1),
meck:expect(emqx_bpapi, supported_version, 2, 1),
init_per_testcase(common, Config);
init_per_testcase(StartStop, Config) when
StartStop == t_start_stop_bridges_cluster;
StartStop == t_start_stop_bridges_node
->
meck:new(emqx_bridge_resource, [passthrough]),
meck:expect(
emqx_bridge_resource,
stop,
fun
(_, <<"bridge_not_found">>) -> {error, not_found};
(Type, Name) -> meck:passthrough([Type, Name])
end
),
init_per_testcase(common, Config);
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
{Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2),
@ -108,6 +123,12 @@ end_per_testcase(t_broken_bpapi_vsn, Config) ->
end_per_testcase(t_old_bpapi_vsn, Config) ->
meck:unload([emqx_bpapi]),
end_per_testcase(common, Config);
end_per_testcase(StartStop, Config) when
StartStop == t_start_stop_bridges_cluster;
StartStop == t_start_stop_bridges_node
->
meck:unload([emqx_bridge_resource]),
end_per_testcase(common, Config);
end_per_testcase(_, Config) ->
Sock = ?config(sock, Config),
Acceptor = ?config(acceptor, Config),
@ -206,12 +227,12 @@ t_http_crud_apis(Config) ->
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
%ct:pal("---bridge: ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
@ -219,7 +240,7 @@ t_http_crud_apis(Config) ->
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
%% send an message to emqx and the message should be forwarded to the HTTP server
Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
@ -243,11 +264,11 @@ t_http_crud_apis(Config) ->
{ok, 200, Bridge2} = request(
put,
uri(["bridges", BridgeID]),
?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL2, Name)
),
?assertMatch(
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
@ -262,7 +283,7 @@ t_http_crud_apis(Config) ->
?assertMatch(
[
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
@ -279,7 +300,7 @@ t_http_crud_apis(Config) ->
{ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
@ -311,7 +332,7 @@ t_http_crud_apis(Config) ->
{ok, 404, ErrMsg2} = request(
put,
uri(["bridges", BridgeID]),
?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL2, Name)
),
?assertMatch(
#{
@ -340,6 +361,34 @@ t_http_crud_apis(Config) ->
},
emqx_json:decode(ErrMsg3, [return_maps])
),
%% Create non working bridge
BrokenURL = ?URL(Port + 1, "/foo"),
{ok, 201, BrokenBridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(BrokenURL, Name)
),
#{
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"disconnected">>,
<<"status_reason">> := <<"Connection refused">>,
<<"node_status">> := [
#{<<"status">> := <<"disconnected">>, <<"status_reason">> := <<"Connection refused">>}
| _
],
<<"url">> := BrokenURL
} = emqx_json:decode(BrokenBridge, [return_maps]),
{ok, 200, FixedBridgeResponse} = request(put, uri(["bridges", BridgeID]), ?HTTP_BRIDGE(URL1)),
#{
<<"status">> := <<"connected">>,
<<"node_status">> := [FixedNodeStatus = #{<<"status">> := <<"connected">>} | _]
} = FixedBridge = emqx_json:decode(FixedBridgeResponse, [return_maps]),
?assert(not maps:is_key(<<"status_reason">>, FixedBridge)),
?assert(not maps:is_key(<<"status_reason">>, FixedNodeStatus)),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
ok.
t_http_bridges_local_topic(Config) ->
@ -356,16 +405,16 @@ t_http_bridges_local_topic(Config) ->
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name1)
?HTTP_BRIDGE(URL1, Name1)
),
%% and we create another one without local_topic
{ok, 201, _} = request(
post,
uri(["bridges"]),
maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name2))
maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, Name2))
),
BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name1),
BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name2),
BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name1),
BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name2),
%% Send an message to emqx and the message should be forwarded to the HTTP server.
%% This is to verify we can have 2 bridges with and without local_topic fields
%% at the same time.
@ -400,11 +449,11 @@ t_check_dependent_actions_on_delete(Config) ->
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = <<"t_http_crud_apis">>,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
{ok, 201, Rule} = request(
post,
@ -438,11 +487,11 @@ t_cascade_delete_actions(Config) ->
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = <<"t_http_crud_apis">>,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
{ok, 201, Rule} = request(
post,
@ -472,7 +521,7 @@ t_cascade_delete_actions(Config) ->
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
{ok, 201, _} = request(
post,
@ -496,9 +545,9 @@ t_broken_bpapi_vsn(Config) ->
{ok, 201, _Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
%% still works since we redirect to 'restart'
{ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>),
{ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>),
@ -511,9 +560,9 @@ t_old_bpapi_vsn(Config) ->
{ok, 201, _Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
{ok, 204, <<>>} = request(post, operation_path(cluster, stop, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(node, stop, BridgeID), <<"">>),
%% still works since we redirect to 'restart'
@ -551,18 +600,18 @@ do_start_stop_bridges(Type, Config) ->
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
%ct:pal("the bridge ==== ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
%% stop it
{ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
@ -597,6 +646,16 @@ do_start_stop_bridges(Type, Config) ->
%% Looks ok but doesn't exist
{ok, 404, _} = request(post, operation_path(Type, start, <<"webhook:cptn_hook">>), <<"">>),
%%
{ok, 201, _Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, <<"bridge_not_found">>)
),
{ok, 503, _} = request(
post, operation_path(Type, stop, <<"webhook:bridge_not_found">>), <<"">>
),
%% Create broken bridge
{ListenPort, Sock} = listen_on_random_port(),
%% Connecting to this endpoint should always timeout
@ -633,18 +692,18 @@ t_enable_disable_bridges(Config) ->
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
%ct:pal("the bridge ==== ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
%% disable it
{ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
@ -690,18 +749,18 @@ t_reset_bridges(Config) ->
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
%ct:pal("the bridge ==== ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
{ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []),
%% delete the bridge
@ -748,20 +807,20 @@ t_bridges_probe(Config) ->
{ok, 204, <<>>} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
?HTTP_BRIDGE(URL)
),
%% second time with same name is ok since no real bridge created
{ok, 204, <<>>} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
?HTTP_BRIDGE(URL)
),
{ok, 400, NxDomain} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>, ?BRIDGE_TYPE, ?BRIDGE_NAME)
?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>)
),
?assertMatch(
#{
@ -790,7 +849,7 @@ t_bridges_probe(Config) ->
emqx_json:decode(ConnRefused, [return_maps])
),
{ok, 400, HostNotFound} = request(
{ok, 400, CouldNotResolveHost} = request(
post,
uri(["bridges_probe"]),
?MQTT_BRIDGE(<<"nohost:2883">>)
@ -798,9 +857,9 @@ t_bridges_probe(Config) ->
?assertMatch(
#{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Host not found">>
<<"message">> := <<"Could not resolve host">>
},
emqx_json:decode(HostNotFound, [return_maps])
emqx_json:decode(CouldNotResolveHost, [return_maps])
),
AuthnConfig = #{
@ -844,7 +903,7 @@ t_bridges_probe(Config) ->
?assertMatch(
#{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Malformed username or password">>
<<"message">> := <<"Bad username or password">>
},
emqx_json:decode(Malformed, [return_maps])
),
@ -882,12 +941,12 @@ t_metrics(Config) ->
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
?HTTP_BRIDGE(URL1, Name)
),
%ct:pal("---bridge: ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
@ -895,7 +954,7 @@ t_metrics(Config) ->
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
%% check for empty bridge metrics
{ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
@ -963,7 +1022,7 @@ t_inconsistent_webhook_request_timeouts(Config) ->
Name = ?BRIDGE_NAME,
BadBridgeParams =
emqx_map_lib:deep_merge(
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name),
?HTTP_BRIDGE(URL1, Name),
#{
<<"request_timeout">> => <<"1s">>,
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}

View File

@ -41,6 +41,7 @@
callback_mode := callback_mode(),
query_mode := query_mode(),
config := resource_config(),
error := term(),
state := resource_state(),
status := resource_status(),
metrics => emqx_metrics_worker:metrics()

View File

@ -522,7 +522,7 @@ start_resource(Data, From) ->
id => Data#data.id,
reason => Reason
}),
_ = maybe_alarm(disconnected, Data#data.id),
_ = maybe_alarm(disconnected, Data#data.id, Data#data.error),
%% Keep track of the error reason why the connection did not work
%% so that the Reason can be returned when the verification call is made.
UpdatedData = Data#data{status = disconnected, error = Reason},
@ -597,7 +597,7 @@ with_health_check(Data, Func) ->
ResId = Data#data.id,
HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state),
{Status, NewState, Err} = parse_health_check_result(HCRes, Data),
_ = maybe_alarm(Status, ResId),
_ = maybe_alarm(Status, ResId, Err),
ok = maybe_resume_resource_workers(ResId, Status),
UpdatedData = Data#data{
state = NewState, status = Status, error = Err
@ -616,15 +616,20 @@ update_state(Data, _DataWas) ->
health_check_interval(Opts) ->
maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).
maybe_alarm(connected, _ResId) ->
maybe_alarm(connected, _ResId, _Error) ->
ok;
maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>) ->
maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>, _Error) ->
ok;
maybe_alarm(_Status, ResId) ->
maybe_alarm(_Status, ResId, Error) ->
HrError =
case Error of
undefined -> <<"Unknown reason">>;
_Else -> emqx_misc:readable_error_msg(Error)
end,
emqx_alarm:activate(
ResId,
#{resource_id => ResId, reason => resource_down},
<<"resource down: ", ResId/binary>>
<<"resource down: ", HrError/binary>>
).
maybe_resume_resource_workers(ResId, connected) ->
@ -666,6 +671,7 @@ maybe_reply(Actions, From, Reply) ->
data_record_to_external_map(Data) ->
#{
id => Data#data.id,
error => Data#data.error,
mod => Data#data.mod,
callback_mode => Data#data.callback_mode,
query_mode => Data#data.query_mode,

View File

@ -0,0 +1,3 @@
Fix `bridges` API to report error conditions for a failing bridge as
`status_reason`. Also when creating an alarm for a failing resource we include
this error condition with the alarm's message.