fix: update testcases for emqx_bridge

This commit is contained in:
Shawn 2022-03-07 16:48:36 +08:00
parent 72409782eb
commit 9a9c92ae88
9 changed files with 56 additions and 52 deletions

View File

@ -17,7 +17,7 @@
%% Bad Request
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(ALREADY_EXISTED, 'ALREADY_EXISTED').
-define(ALREADY_EXISTS, 'ALREADY_EXISTS').
-define(BAD_CONFIG_SCHEMA, 'BAD_CONFIG_SCHEMA').
-define(BAD_LISTENER_ID, 'BAD_LISTENER_ID').
-define(BAD_NODE_NAME, 'BAD_NODE_NAME').
@ -49,7 +49,7 @@
%% All codes
-define(ERROR_CODES,
[ {'BAD_REQUEST', <<"Request parameters are not legal">>}
, {'ALREADY_EXISTED', <<"Resource already existed">>}
, {'ALREADY_EXISTS', <<"Resource already existed">>}
, {'BAD_CONFIG_SCHEMA', <<"Configuration data is not legal">>}
, {'BAD_LISTENER_ID', <<"Bad listener ID">>}
, {'BAD_NODE_NAME', <<"Bad Node Name">>}

View File

@ -31,6 +31,7 @@
, bridge_type/1
, resource_id/1
, resource_id/2
, bridge_id/2
, parse_bridge_id/1
]).
@ -202,8 +203,9 @@ lookup(Type, Name) ->
lookup(Type, Name, RawConf) ->
case emqx_resource:get_instance(resource_id(Type, Name)) of
{error, not_found} -> {error, not_found};
{ok, _, Data} -> {ok, #{id => bridge_id(Type, Name), resource_data => Data,
raw_config => RawConf}}
{ok, _, Data} ->
{ok, #{type => Type, name => Name, resource_data => Data,
raw_config => RawConf}}
end.
start(Type, Name) ->

View File

@ -284,9 +284,8 @@ schema("/bridges/:id/operation/:operation") ->
}
}.
'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf0}) ->
'/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
Conf = filter_out_request_body(Conf0),
BridgeName = emqx_misc:gen_id(),
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)};
@ -324,7 +323,7 @@ schema("/bridges/:id/operation/:operation") ->
#{override_to => cluster}) of
{ok, _} -> {204};
{error, Reason} ->
{500, error_msg('UNKNOWN_ERROR', Reason)}
{500, error_msg('INTERNAL_ERROR', Reason)}
end).
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
@ -335,7 +334,7 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
{ok, [{error, not_found} | _]} ->
{404, error_msg('NOT_FOUND', <<"not_found">>)};
{error, ErrL} ->
{500, error_msg('UNKNOWN_ERROR', ErrL)}
{500, error_msg('INTERNAL_ERROR', ErrL)}
end.
lookup_from_local_node(BridgeType, BridgeName) ->
@ -355,7 +354,7 @@ lookup_from_local_node(BridgeType, BridgeName) ->
{error, {pre_config_update, _, bridge_not_found}} ->
{404, error_msg('NOT_FOUND', <<"bridge not found">>)};
{error, Reason} ->
{500, error_msg('UNKNOWN_ERROR', Reason)}
{500, error_msg('INTERNAL_ERROR', Reason)}
end
end).
@ -373,17 +372,19 @@ ensure_bridge_created(BridgeType, BridgeName, Conf) ->
end.
zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
lists:foldl(fun(#{id := Id}, Acc) ->
Bridges = pick_bridges_by_id(Id, BridgesAllNodes),
lists:foldl(fun(#{type := Type, name := Name}, Acc) ->
Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes),
[format_bridge_info(Bridges) | Acc]
end, [], BridgesFirstNode).
pick_bridges_by_id(Id, BridgesAllNodes) ->
pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
lists:foldl(fun(BridgesOneNode, Acc) ->
case [Bridge || Bridge = #{id := Id0} <- BridgesOneNode, Id0 == Id] of
case [Bridge || Bridge = #{type := Type0, name := Name0} <- BridgesOneNode,
Type0 == Type, Name0 == Name] of
[BridgeInfo] -> [BridgeInfo | Acc];
[] ->
?SLOG(warning, #{msg => "bridge_inconsistent_in_cluster", bridge => Id}),
?SLOG(warning, #{msg => "bridge_inconsistent_in_cluster",
bridge => emqx_bridge:bridge_id(Type, Name)}),
Acc
end
end, [], BridgesAllNodes).
@ -421,9 +422,8 @@ aggregate_metrics(AllMetrics) ->
Rate1 + Rate0, Rate5m1 + Rate5m0, RateMax1 + RateMax0)
end, InitMetrics, AllMetrics).
format_resp(#{id := Id, raw_config := RawConf,
format_resp(#{type := Type, name := BridgeName, raw_config := RawConf,
resource_data := #{status := Status, metrics := Metrics}}) ->
{Type, BridgeName} = emqx_bridge:parse_bridge_id(Id),
IsConnected = fun(connected) -> connected; (_) -> disconnected end,
RawConf#{
type => Type,
@ -448,7 +448,7 @@ is_ok(ResL) ->
end.
filter_out_request_body(Conf) ->
ExtraConfs = [<<"id">>, <<"type">>, <<"status">>, <<"node_status">>,
ExtraConfs = [<<"id">>, <<"type">>, <<"name">>, <<"status">>, <<"node_status">>,
<<"node_metrics">>, <<"metrics">>, <<"node">>],
maps:without(ExtraConfs, Conf).

View File

@ -10,7 +10,7 @@
%% Hocon Schema Definitions
roots() -> [].
fields("bridge") ->
fields("config") ->
basic_config() ++
[ {url, mk(binary(),
#{ required => true
@ -68,10 +68,11 @@ How long will the HTTP request timeout.
fields("post") ->
[ type_field()
] ++ fields("bridge");
, name_field()
] ++ fields("config");
fields("put") ->
fields("bridge");
fields("config");
fields("get") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("post").
@ -82,10 +83,6 @@ basic_config() ->
#{ desc => "Enable or disable this bridge"
, default => true
})}
, {name,
mk(binary(),
#{ desc => "Bridge name, used as a human-readable description of the bridge."
})}
, {direction,
mk(egress,
#{ desc => "The direction of this bridge, MUST be 'egress'"
@ -97,7 +94,14 @@ basic_config() ->
%%======================================================================================
type_field() ->
{type, mk(http, #{desc => "The Bridge Type"})}.
{type, mk(http,
#{ desc => "The Bridge Type"
})}.
name_field() ->
{name, mk(binary(),
#{ desc => "Bridge name, used as a human-readable description of the bridge."
})}.
method() ->
enum([post, put, get, delete]).

View File

@ -24,9 +24,11 @@ fields("egress") ->
fields("post_ingress") ->
[ type_field()
, name_field()
] ++ proplists:delete(enable, fields("ingress"));
fields("post_egress") ->
[ type_field()
, name_field()
] ++ proplists:delete(enable, fields("egress"));
fields("put_ingress") ->
@ -41,4 +43,11 @@ fields("get_egress") ->
%%======================================================================================
type_field() ->
{type, mk(mqtt, #{desc => "The bridge type"})}.
{type, mk(mqtt,
#{ desc => "The bridge type."
})}.
name_field() ->
{name, mk(binary(),
#{ desc => "Bridge name, used as a human-readable description of the bridge."
})}.

View File

@ -46,10 +46,6 @@ common_bridge_fields() ->
#{ desc => "Enable or disable this bridge"
, default => true
})}
, {name,
mk(binary(),
#{ desc => "Bridge name, used as a human-readable description of the bridge."
})}
, {connector,
mk(binary(),
#{ required => true
@ -86,7 +82,7 @@ direction_field(Dir, Desc) ->
roots() -> [bridges].
fields(bridges) ->
[{http, mk(hoconsc:map(name, ref(emqx_bridge_http_schema, "bridge")), #{})}]
[{http, mk(hoconsc:map(name, ref(emqx_bridge_http_schema, "config")), #{})}]
++ [{T, mk(hoconsc:map(name, hoconsc:union([
ref(schema_mod(T), "ingress"),
ref(schema_mod(T), "egress")

View File

@ -152,8 +152,7 @@ t_http_crud_apis(_) ->
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
%ct:pal("---bridge: ~p", [Bridge]),
#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE
#{ <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
@ -162,6 +161,7 @@ t_http_crud_apis(_) ->
, <<"url">> := URL1
} = jsx:decode(Bridge),
BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
%% send an message to emqx and the message should be forwarded to the HTTP server
wait_for_resource_ready(BridgeID, 5),
Body = <<"my msg">>,
@ -181,8 +181,7 @@ t_http_crud_apis(_) ->
URL2 = ?URL(Port, "path2"),
{ok, 200, Bridge2} = request(put, uri(["bridges", BridgeID]),
?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
?assertMatch(#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE
?assertMatch(#{ <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
@ -193,8 +192,7 @@ t_http_crud_apis(_) ->
%% list all bridges again, assert Bridge2 is in it
{ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []),
?assertMatch([#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE
?assertMatch([#{ <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
@ -205,8 +203,7 @@ t_http_crud_apis(_) ->
%% get the bridge by id
{ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE
?assertMatch(#{ <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
@ -251,8 +248,7 @@ t_start_stop_bridges(_) ->
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
%ct:pal("the bridge ==== ~p", [Bridge]),
#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE
#{ <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
@ -260,31 +256,28 @@ t_start_stop_bridges(_) ->
, <<"node_metrics">> := [_|_]
, <<"url">> := URL1
} = jsx:decode(Bridge),
BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
%% stop it
{ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"disconnected">>
?assertMatch(#{ <<"status">> := <<"disconnected">>
}, jsx:decode(Bridge2)),
%% start again
{ok, 200, <<>>} = request(post, operation_path(start, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"connected">>
?assertMatch(#{ <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% restart an already started bridge
{ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"connected">>
?assertMatch(#{ <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% stop it again
{ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
%% restart a stopped bridge
{ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"connected">>
?assertMatch(#{ <<"status">> := <<"connected">>
}, jsx:decode(Bridge4)),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),

View File

@ -68,7 +68,7 @@ schema("/banned") ->
responses => #{
200 => [{data, hoconsc:mk(hoconsc:array(hoconsc:ref(ban)), #{})}],
400 => emqx_dashboard_swagger:error_codes(
['ALREADY_EXISTED', 'BAD_REQUEST'],
['ALREADY_EXISTS', 'BAD_REQUEST'],
<<"Banned already existed, or bad args">>)
}
}
@ -141,7 +141,7 @@ banned(post, #{body := Body}) ->
case emqx_banned:create(Ban) of
{ok, Banned} -> {200, format(Banned)};
{error, {already_exist, Old}} ->
{400, 'ALREADY_EXISTED', format(Old)}
{400, 'ALREADY_EXISTS', format(Old)}
end
end.

View File

@ -264,7 +264,7 @@ trace(post, #{body := Param}) ->
{ok, Trace0} -> {200, format_trace(Trace0)};
{error, {already_existed, Name}} ->
{400, #{
code => 'ALREADY_EXISTED',
code => 'ALREADY_EXISTS',
message => ?TO_BIN([Name, " Already Exists"])
}};
{error, {duplicate_condition, Name}} ->