fix(bridge_v2_api): take status and error from bridge, not the connector

Fixes https://emqx.atlassian.net/browse/EMQX-11284
Fixes https://emqx.atlassian.net/browse/EMQX-11298
This commit is contained in:
Thales Macedo Garitezi 2023-11-08 09:45:50 -03:00
parent 26094ac611
commit f5456135aa
3 changed files with 79 additions and 22 deletions

View File

@ -709,8 +709,10 @@ format_resource(
#{
type := Type,
name := Name,
status := Status,
error := Error,
raw_config := RawConf,
resource_data := ResourceData
resource_data := _ResourceData
},
Node
) ->
@ -719,14 +721,16 @@ format_resource(
RawConf#{
type => Type,
name => maps:get(<<"name">>, RawConf, Name),
node => Node
node => Node,
status => Status,
error => Error
},
format_resource_data(ResourceData)
format_bridge_status_and_error(#{status => Status, error => Error})
)
).
format_resource_data(ResData) ->
maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], ResData)).
format_bridge_status_and_error(Data) ->
maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], Data)).
format_resource_data(error, undefined, Result) ->
Result;

View File

@ -145,6 +145,39 @@ create_bridge(Config, Overrides) ->
ct:pal("creating bridge with config: ~p", [BridgeConfig]),
emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig).
list_bridges_api() ->
Params = [],
Path = emqx_mgmt_api_test_util:api_path(["actions"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("listing bridges (via http)"),
Res =
case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} ->
{ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
Error ->
Error
end,
ct:pal("list bridges result: ~p", [Res]),
Res.
get_bridge_api(BridgeType, BridgeName) ->
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
Params = [],
Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("get bridge ~p (via http)", [{BridgeType, BridgeName}]),
Res =
case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} ->
{ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
Error ->
Error
end,
ct:pal("get bridge ~p result: ~p", [{BridgeType, BridgeName}, Res]),
Res.
create_bridge_api(Config) ->
create_bridge_api(Config, _Overrides = #{}).

View File

@ -29,25 +29,27 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
_ = application:load(emqx_conf),
ok = emqx_common_test_helpers:start_apps(apps_to_start_and_stop()),
application:ensure_all_started(telemetry),
application:ensure_all_started(wolff),
application:ensure_all_started(brod),
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge_kafka,
emqx_bridge,
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
emqx_bridge_kafka_impl_producer_SUITE:wait_until_kafka_is_up(),
Config.
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps(apps_to_start_and_stop()).
apps_to_start_and_stop() ->
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge,
emqx_rule_engine
].
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
t_create_remove_list(_) ->
[] = emqx_bridge_v2:list(),
@ -165,6 +167,24 @@ t_unknown_topic(_Config) ->
ok
end
),
?assertMatch(
{ok,
{{_, 200, _}, _, [
#{
<<"status">> := <<"disconnected">>,
<<"node_status">> := [#{<<"status">> := <<"disconnected">>}]
}
]}},
emqx_bridge_v2_testlib:list_bridges_api()
),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"status">> := <<"disconnected">>,
<<"node_status">> := [#{<<"status">> := <<"disconnected">>}]
}}},
emqx_bridge_v2_testlib:get_bridge_api(?TYPE, BridgeName)
),
ok.
check_send_message_with_bridge(BridgeName) ->