Merge pull request #7228 from terry-xiaoyu/improve_rule_bridge_apis
Improve APIs for bridges and connectors
This commit is contained in:
commit
5056e9fa55
|
@ -17,7 +17,7 @@
|
|||
%% Bad Request
|
||||
-define(BAD_REQUEST, 'BAD_REQUEST').
|
||||
|
||||
-define(ALREADY_EXISTED, 'ALREADY_EXISTED').
|
||||
-define(ALREADY_EXISTS, 'ALREADY_EXISTS').
|
||||
-define(BAD_CONFIG_SCHEMA, 'BAD_CONFIG_SCHEMA').
|
||||
-define(BAD_LISTENER_ID, 'BAD_LISTENER_ID').
|
||||
-define(BAD_NODE_NAME, 'BAD_NODE_NAME').
|
||||
|
@ -49,7 +49,7 @@
|
|||
%% All codes
|
||||
-define(ERROR_CODES,
|
||||
[ {'BAD_REQUEST', <<"Request parameters are not legal">>}
|
||||
, {'ALREADY_EXISTED', <<"Resource already existed">>}
|
||||
, {'ALREADY_EXISTS', <<"Resource already existed">>}
|
||||
, {'BAD_CONFIG_SCHEMA', <<"Configuration data is not legal">>}
|
||||
, {'BAD_LISTENER_ID', <<"Bad listener ID">>}
|
||||
, {'BAD_NODE_NAME', <<"Bad Node Name">>}
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
, bridge_type/1
|
||||
, resource_id/1
|
||||
, resource_id/2
|
||||
, bridge_id/2
|
||||
, parse_bridge_id/1
|
||||
]).
|
||||
|
||||
|
@ -46,7 +47,7 @@
|
|||
, recreate/3
|
||||
, create_dry_run/2
|
||||
, remove/1
|
||||
, remove/3
|
||||
, remove/2
|
||||
, update/2
|
||||
, update/3
|
||||
, start/2
|
||||
|
@ -202,8 +203,9 @@ lookup(Type, Name) ->
|
|||
lookup(Type, Name, RawConf) ->
|
||||
case emqx_resource:get_instance(resource_id(Type, Name)) of
|
||||
{error, not_found} -> {error, not_found};
|
||||
{ok, _, Data} -> {ok, #{id => bridge_id(Type, Name), resource_data => Data,
|
||||
raw_config => RawConf}}
|
||||
{ok, _, Data} ->
|
||||
{ok, #{type => Type, name => Name, resource_data => Data,
|
||||
raw_config => RawConf}}
|
||||
end.
|
||||
|
||||
start(Type, Name) ->
|
||||
|
@ -222,7 +224,8 @@ create(BridgeId, Conf) ->
|
|||
create(Type, Name, Conf) ->
|
||||
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
|
||||
config => Conf}),
|
||||
case emqx_resource:create_local(resource_id(Type, Name), <<"emqx_bridge">>, emqx_bridge:resource_type(Type),
|
||||
case emqx_resource:create_local(resource_id(Type, Name), <<"emqx_bridge">>,
|
||||
emqx_bridge:resource_type(Type),
|
||||
parse_confs(Type, Name, Conf), #{async_create => true}) of
|
||||
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
|
||||
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
|
||||
|
@ -284,6 +287,10 @@ remove(BridgeId) ->
|
|||
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||
remove(BridgeType, BridgeName, #{}).
|
||||
|
||||
remove(Type, Name) ->
|
||||
remove(Type, Name, undefined).
|
||||
|
||||
%% just for perform_bridge_changes/1
|
||||
remove(Type, Name, _Conf) ->
|
||||
?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
|
||||
case emqx_resource:remove_local(resource_id(Type, Name)) of
|
||||
|
|
|
@ -88,16 +88,18 @@ get_response_body_schema() ->
|
|||
bridge_info_examples(get)).
|
||||
|
||||
param_path_operation() ->
|
||||
path_param(operation, enum([start, stop, restart]), <<"start">>).
|
||||
|
||||
param_path_id() ->
|
||||
path_param(id, binary(), <<"http:my_http_bridge">>).
|
||||
|
||||
path_param(Name, Type, Example) ->
|
||||
{Name, mk(Type,
|
||||
{operation, mk(enum([start, stop, restart]),
|
||||
#{ in => path
|
||||
, required => true
|
||||
, example => Example
|
||||
, example => <<"start">>
|
||||
})}.
|
||||
|
||||
param_path_id() ->
|
||||
{id, mk(binary(),
|
||||
#{ in => path
|
||||
, required => true
|
||||
, example => <<"http:my_http_bridge">>
|
||||
, desc => <<"The bridge Id. Must be of format {type}:{name}">>
|
||||
})}.
|
||||
|
||||
bridge_info_array_example(Method) ->
|
||||
|
@ -132,36 +134,31 @@ info_example(Type, Direction, Method) ->
|
|||
maps:merge(info_example_basic(Type, Direction),
|
||||
method_example(Type, Direction, Method)).
|
||||
|
||||
method_example(Type, Direction, get) ->
|
||||
method_example(Type, Direction, Method) when Method == get; Method == post ->
|
||||
SType = atom_to_list(Type),
|
||||
SDir = atom_to_list(Direction),
|
||||
SName = case Type of
|
||||
http -> "my_" ++ SType ++ "_bridge";
|
||||
_ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge"
|
||||
end,
|
||||
#{
|
||||
id => bin(SType ++ ":" ++ SName),
|
||||
TypeNameExamp = #{
|
||||
type => bin(SType),
|
||||
name => bin(SName),
|
||||
name => bin(SName)
|
||||
},
|
||||
maybe_with_metrics_example(TypeNameExamp, Method);
|
||||
method_example(_Type, _Direction, put) ->
|
||||
#{}.
|
||||
|
||||
maybe_with_metrics_example(TypeNameExamp, get) ->
|
||||
TypeNameExamp#{
|
||||
metrics => ?METRICS(0, 0, 0, 0, 0, 0),
|
||||
node_metrics => [
|
||||
#{node => node(),
|
||||
metrics => ?METRICS(0, 0, 0, 0, 0, 0)}
|
||||
metrics => ?METRICS(0, 0, 0, 0, 0, 0)}
|
||||
]
|
||||
};
|
||||
method_example(Type, Direction, post) ->
|
||||
SType = atom_to_list(Type),
|
||||
SDir = atom_to_list(Direction),
|
||||
SName = case Type of
|
||||
http -> "my_" ++ SType ++ "_bridge";
|
||||
_ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge"
|
||||
end,
|
||||
#{
|
||||
type => bin(SType),
|
||||
name => bin(SName)
|
||||
};
|
||||
method_example(_Type, _Direction, put) ->
|
||||
#{}.
|
||||
maybe_with_metrics_example(TypeNameExamp, _) ->
|
||||
TypeNameExamp.
|
||||
|
||||
info_example_basic(http, _) ->
|
||||
#{
|
||||
|
@ -202,7 +199,7 @@ info_example_basic(mqtt, egress) ->
|
|||
|
||||
schema("/bridges") ->
|
||||
#{
|
||||
operationId => '/bridges',
|
||||
'operationId' => '/bridges',
|
||||
get => #{
|
||||
tags => [<<"bridges">>],
|
||||
summary => <<"List Bridges">>,
|
||||
|
@ -216,8 +213,8 @@ schema("/bridges") ->
|
|||
post => #{
|
||||
tags => [<<"bridges">>],
|
||||
summary => <<"Create Bridge">>,
|
||||
description => <<"Create a new bridge">>,
|
||||
requestBody => emqx_dashboard_swagger:schema_with_examples(
|
||||
description => <<"Create a new bridge by type and name">>,
|
||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||
emqx_bridge_schema:post_request(),
|
||||
bridge_info_examples(post)),
|
||||
responses => #{
|
||||
|
@ -229,7 +226,7 @@ schema("/bridges") ->
|
|||
|
||||
schema("/bridges/:id") ->
|
||||
#{
|
||||
operationId => '/bridges/:id',
|
||||
'operationId' => '/bridges/:id',
|
||||
get => #{
|
||||
tags => [<<"bridges">>],
|
||||
summary => <<"Get Bridge">>,
|
||||
|
@ -243,9 +240,9 @@ schema("/bridges/:id") ->
|
|||
put => #{
|
||||
tags => [<<"bridges">>],
|
||||
summary => <<"Update Bridge">>,
|
||||
description => <<"Update a bridge">>,
|
||||
description => <<"Update a bridge by Id">>,
|
||||
parameters => [param_path_id()],
|
||||
requestBody => emqx_dashboard_swagger:schema_with_examples(
|
||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||
emqx_bridge_schema:put_request(),
|
||||
bridge_info_examples(put)),
|
||||
responses => #{
|
||||
|
@ -257,7 +254,7 @@ schema("/bridges/:id") ->
|
|||
delete => #{
|
||||
tags => [<<"bridges">>],
|
||||
summary => <<"Delete Bridge">>,
|
||||
description => <<"Delete a bridge">>,
|
||||
description => <<"Delete a bridge by Id">>,
|
||||
parameters => [param_path_id()],
|
||||
responses => #{
|
||||
204 => <<"Bridge deleted">>
|
||||
|
@ -267,11 +264,11 @@ schema("/bridges/:id") ->
|
|||
|
||||
schema("/bridges/:id/operation/:operation") ->
|
||||
#{
|
||||
operationId => '/bridges/:id/operation/:operation',
|
||||
'operationId' => '/bridges/:id/operation/:operation',
|
||||
post => #{
|
||||
tags => [<<"bridges">>],
|
||||
summary => <<"Start/Stop/Restart Bridge">>,
|
||||
description => <<"Start/Stop/Restart bridges on a specific node">>,
|
||||
description => <<"Start/Stop/Restart bridges on a specific node.">>,
|
||||
parameters => [
|
||||
param_path_id(),
|
||||
param_path_operation()
|
||||
|
@ -283,9 +280,8 @@ schema("/bridges/:id/operation/:operation") ->
|
|||
}
|
||||
}.
|
||||
|
||||
'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf0}) ->
|
||||
'/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
|
||||
Conf = filter_out_request_body(Conf0),
|
||||
BridgeName = emqx_misc:gen_id(),
|
||||
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
||||
{ok, _} ->
|
||||
{400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)};
|
||||
|
@ -323,7 +319,7 @@ schema("/bridges/:id/operation/:operation") ->
|
|||
#{override_to => cluster}) of
|
||||
{ok, _} -> {204};
|
||||
{error, Reason} ->
|
||||
{500, error_msg('UNKNOWN_ERROR', Reason)}
|
||||
{500, error_msg('INTERNAL_ERROR', Reason)}
|
||||
end).
|
||||
|
||||
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
||||
|
@ -334,7 +330,7 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
|||
{ok, [{error, not_found} | _]} ->
|
||||
{404, error_msg('NOT_FOUND', <<"not_found">>)};
|
||||
{error, ErrL} ->
|
||||
{500, error_msg('UNKNOWN_ERROR', ErrL)}
|
||||
{500, error_msg('INTERNAL_ERROR', ErrL)}
|
||||
end.
|
||||
|
||||
lookup_from_local_node(BridgeType, BridgeName) ->
|
||||
|
@ -354,7 +350,7 @@ lookup_from_local_node(BridgeType, BridgeName) ->
|
|||
{error, {pre_config_update, _, bridge_not_found}} ->
|
||||
{404, error_msg('NOT_FOUND', <<"bridge not found">>)};
|
||||
{error, Reason} ->
|
||||
{500, error_msg('UNKNOWN_ERROR', Reason)}
|
||||
{500, error_msg('INTERNAL_ERROR', Reason)}
|
||||
end
|
||||
end).
|
||||
|
||||
|
@ -372,17 +368,19 @@ ensure_bridge_created(BridgeType, BridgeName, Conf) ->
|
|||
end.
|
||||
|
||||
zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
|
||||
lists:foldl(fun(#{id := Id}, Acc) ->
|
||||
Bridges = pick_bridges_by_id(Id, BridgesAllNodes),
|
||||
lists:foldl(fun(#{type := Type, name := Name}, Acc) ->
|
||||
Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes),
|
||||
[format_bridge_info(Bridges) | Acc]
|
||||
end, [], BridgesFirstNode).
|
||||
|
||||
pick_bridges_by_id(Id, BridgesAllNodes) ->
|
||||
pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
|
||||
lists:foldl(fun(BridgesOneNode, Acc) ->
|
||||
case [Bridge || Bridge = #{id := Id0} <- BridgesOneNode, Id0 == Id] of
|
||||
case [Bridge || Bridge = #{type := Type0, name := Name0} <- BridgesOneNode,
|
||||
Type0 == Type, Name0 == Name] of
|
||||
[BridgeInfo] -> [BridgeInfo | Acc];
|
||||
[] ->
|
||||
?SLOG(warning, #{msg => "bridge_inconsistent_in_cluster", bridge => Id}),
|
||||
?SLOG(warning, #{msg => "bridge_inconsistent_in_cluster",
|
||||
bridge => emqx_bridge:bridge_id(Type, Name)}),
|
||||
Acc
|
||||
end
|
||||
end, [], BridgesAllNodes).
|
||||
|
@ -420,11 +418,9 @@ aggregate_metrics(AllMetrics) ->
|
|||
Rate1 + Rate0, Rate5m1 + Rate5m0, RateMax1 + RateMax0)
|
||||
end, InitMetrics, AllMetrics).
|
||||
|
||||
format_resp(#{id := Id, raw_config := RawConf,
|
||||
format_resp(#{type := Type, name := BridgeName, raw_config := RawConf,
|
||||
resource_data := #{status := Status, metrics := Metrics}}) ->
|
||||
{Type, BridgeName} = emqx_bridge:parse_bridge_id(Id),
|
||||
RawConf#{
|
||||
id => Id,
|
||||
type => Type,
|
||||
name => maps:get(<<"name">>, RawConf, BridgeName),
|
||||
node => node(),
|
||||
|
@ -447,7 +443,7 @@ is_ok(ResL) ->
|
|||
end.
|
||||
|
||||
filter_out_request_body(Conf) ->
|
||||
ExtraConfs = [<<"id">>, <<"type">>, <<"status">>, <<"node_status">>,
|
||||
ExtraConfs = [<<"id">>, <<"type">>, <<"name">>, <<"status">>, <<"node_status">>,
|
||||
<<"node_metrics">>, <<"metrics">>, <<"node">>],
|
||||
maps:without(ExtraConfs, Conf).
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
%% Hocon Schema Definitions
|
||||
roots() -> [].
|
||||
|
||||
fields("bridge") ->
|
||||
fields("config") ->
|
||||
basic_config() ++
|
||||
[ {url, mk(binary(),
|
||||
#{ required => true
|
||||
|
@ -27,8 +27,9 @@ is not allowed.
|
|||
#{ desc =>"""
|
||||
The MQTT topic filter to be forwarded to the HTTP server. All MQTT 'PUBLISH' messages with the topic
|
||||
matching the local_topic will be forwarded.<br/>
|
||||
NOTE: if this bridge is used as the output of a rule (EMQX rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that match
|
||||
local_topic will be forwarded.
|
||||
NOTE: if this bridge is used as the output of a rule (EMQX rule engine), and also local_topic is
|
||||
configured, then both the data got from the rule and the MQTT messages that match local_topic
|
||||
will be forwarded.
|
||||
"""
|
||||
})}
|
||||
, {method, mk(method(),
|
||||
|
@ -68,14 +69,14 @@ How long will the HTTP request timeout.
|
|||
|
||||
fields("post") ->
|
||||
[ type_field()
|
||||
] ++ fields("bridge");
|
||||
, name_field()
|
||||
] ++ fields("config");
|
||||
|
||||
fields("put") ->
|
||||
fields("bridge");
|
||||
fields("config");
|
||||
|
||||
fields("get") ->
|
||||
[ id_field()
|
||||
] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post").
|
||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post").
|
||||
|
||||
basic_config() ->
|
||||
[ {enable,
|
||||
|
@ -83,10 +84,6 @@ basic_config() ->
|
|||
#{ desc => "Enable or disable this bridge"
|
||||
, default => true
|
||||
})}
|
||||
, {name,
|
||||
mk(binary(),
|
||||
#{ desc => "Bridge name, used as a human-readable description of the bridge."
|
||||
})}
|
||||
, {direction,
|
||||
mk(egress,
|
||||
#{ desc => "The direction of this bridge, MUST be 'egress'"
|
||||
|
@ -96,11 +93,18 @@ basic_config() ->
|
|||
++ proplists:delete(base_url, emqx_connector_http:fields(config)).
|
||||
|
||||
%%======================================================================================
|
||||
id_field() ->
|
||||
{id, mk(binary(), #{desc => "The Bridge ID", example => "http:my_http_bridge"})}.
|
||||
|
||||
type_field() ->
|
||||
{type, mk(http, #{desc => "The Bridge Type"})}.
|
||||
{type, mk(http,
|
||||
#{ required => true
|
||||
, desc => "The Bridge Type"
|
||||
})}.
|
||||
|
||||
name_field() ->
|
||||
{name, mk(binary(),
|
||||
#{ required => true
|
||||
, desc => "Bridge name, used as a human-readable description of the bridge."
|
||||
})}.
|
||||
|
||||
method() ->
|
||||
enum([post, put, get, delete]).
|
||||
|
|
|
@ -24,9 +24,11 @@ fields("egress") ->
|
|||
|
||||
fields("post_ingress") ->
|
||||
[ type_field()
|
||||
, name_field()
|
||||
] ++ proplists:delete(enable, fields("ingress"));
|
||||
fields("post_egress") ->
|
||||
[ type_field()
|
||||
, name_field()
|
||||
] ++ proplists:delete(enable, fields("egress"));
|
||||
|
||||
fields("put_ingress") ->
|
||||
|
@ -35,15 +37,19 @@ fields("put_egress") ->
|
|||
proplists:delete(enable, fields("egress"));
|
||||
|
||||
fields("get_ingress") ->
|
||||
[ id_field()
|
||||
] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post_ingress");
|
||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post_ingress");
|
||||
fields("get_egress") ->
|
||||
[ id_field()
|
||||
] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post_egress").
|
||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post_egress").
|
||||
|
||||
%%======================================================================================
|
||||
id_field() ->
|
||||
{id, mk(binary(), #{desc => "The bridge ID", example => "mqtt:my_mqtt_bridge"})}.
|
||||
|
||||
type_field() ->
|
||||
{type, mk(mqtt, #{desc => "The bridge type"})}.
|
||||
{type, mk(mqtt,
|
||||
#{ required => true
|
||||
, desc => "The bridge type."
|
||||
})}.
|
||||
|
||||
name_field() ->
|
||||
{name, mk(binary(),
|
||||
#{ required => true
|
||||
, desc => "Bridge name, used as a human-readable description of the bridge."
|
||||
})}.
|
||||
|
|
|
@ -46,10 +46,6 @@ common_bridge_fields() ->
|
|||
#{ desc => "Enable or disable this bridge"
|
||||
, default => true
|
||||
})}
|
||||
, {name,
|
||||
mk(binary(),
|
||||
#{ desc => "Bridge name, used as a human-readable description of the bridge."
|
||||
})}
|
||||
, {connector,
|
||||
mk(binary(),
|
||||
#{ required => true
|
||||
|
@ -57,7 +53,8 @@ common_bridge_fields() ->
|
|||
, desc =>"""
|
||||
The connector ID to be used for this bridge. Connector IDs must be of format:
|
||||
<code>{type}:{name}</code>.<br>
|
||||
In config files, you can find the corresponding config entry for a connector by such path: 'connectors.{type}.{name}'.<br>
|
||||
In config files, you can find the corresponding config entry for a connector by such path:
|
||||
'connectors.{type}.{name}'.<br>
|
||||
"""
|
||||
})}
|
||||
].
|
||||
|
@ -67,7 +64,7 @@ metrics_status_fields() ->
|
|||
, {"node_metrics", mk(hoconsc:array(ref(?MODULE, "node_metrics")),
|
||||
#{ desc => "The metrics of the bridge for each node"
|
||||
})}
|
||||
, {"status", mk(ref(?MODULE, "status"), #{desc => "The status of the bridge"})}
|
||||
, {"status", mk(status(), #{desc => "The status of the bridge"})}
|
||||
, {"node_status", mk(hoconsc:array(ref(?MODULE, "node_status")),
|
||||
#{ desc => "The status of the bridge for each node"
|
||||
})}
|
||||
|
@ -86,7 +83,7 @@ direction_field(Dir, Desc) ->
|
|||
roots() -> [bridges].
|
||||
|
||||
fields(bridges) ->
|
||||
[{http, mk(hoconsc:map(name, ref(emqx_bridge_http_schema, "bridge")), #{})}]
|
||||
[{http, mk(hoconsc:map(name, ref(emqx_bridge_http_schema, "config")), #{})}]
|
||||
++ [{T, mk(hoconsc:map(name, hoconsc:union([
|
||||
ref(schema_mod(T), "ingress"),
|
||||
ref(schema_mod(T), "egress")
|
||||
|
@ -107,21 +104,14 @@ fields("node_metrics") ->
|
|||
, {"metrics", mk(ref(?MODULE, "metrics"), #{})}
|
||||
];
|
||||
|
||||
fields("status") ->
|
||||
[ {"matched", mk(integer(), #{desc => "Count of this bridge is queried"})}
|
||||
, {"success", mk(integer(), #{desc => "Count of query success"})}
|
||||
, {"failed", mk(integer(), #{desc => "Count of query failed"})}
|
||||
, {"rate", mk(float(), #{desc => "The rate of matched, times/second"})}
|
||||
, {"rate_max", mk(float(), #{desc => "The max rate of matched, times/second"})}
|
||||
, {"rate_last5m", mk(float(),
|
||||
#{desc => "The average rate of matched in the last 5 minutes, times/second"})}
|
||||
];
|
||||
|
||||
fields("node_status") ->
|
||||
[ node_name()
|
||||
, {"status", mk(ref(?MODULE, "status"), #{})}
|
||||
, {"status", mk(status(), #{})}
|
||||
].
|
||||
|
||||
status() ->
|
||||
hoconsc:enum([connected, disconnected, connecting]).
|
||||
|
||||
node_name() ->
|
||||
{"node", mk(binary(), #{desc => "The node name", example => "emqx@127.0.0.1"})}.
|
||||
|
||||
|
|
|
@ -55,7 +55,8 @@ init_per_suite(Config) ->
|
|||
%% some testcases (may from other app) already get emqx_connector started
|
||||
_ = application:stop(emqx_resource),
|
||||
_ = application:stop(emqx_connector),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_bridge, emqx_dashboard], fun set_special_configs/1),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_bridge, emqx_dashboard],
|
||||
fun set_special_configs/1),
|
||||
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?CONF_DEFAULT),
|
||||
Config.
|
||||
|
||||
|
@ -152,8 +153,7 @@ t_http_crud_apis(_) ->
|
|||
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
|
||||
|
||||
%ct:pal("---bridge: ~p", [Bridge]),
|
||||
#{ <<"id">> := BridgeID
|
||||
, <<"type">> := ?BRIDGE_TYPE
|
||||
#{ <<"type">> := ?BRIDGE_TYPE
|
||||
, <<"name">> := ?BRIDGE_NAME
|
||||
, <<"status">> := _
|
||||
, <<"node_status">> := [_|_]
|
||||
|
@ -162,6 +162,7 @@ t_http_crud_apis(_) ->
|
|||
, <<"url">> := URL1
|
||||
} = jsx:decode(Bridge),
|
||||
|
||||
BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
|
||||
%% send an message to emqx and the message should be forwarded to the HTTP server
|
||||
wait_for_resource_ready(BridgeID, 5),
|
||||
Body = <<"my msg">>,
|
||||
|
@ -181,8 +182,7 @@ t_http_crud_apis(_) ->
|
|||
URL2 = ?URL(Port, "path2"),
|
||||
{ok, 200, Bridge2} = request(put, uri(["bridges", BridgeID]),
|
||||
?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
|
||||
?assertMatch(#{ <<"id">> := BridgeID
|
||||
, <<"type">> := ?BRIDGE_TYPE
|
||||
?assertMatch(#{ <<"type">> := ?BRIDGE_TYPE
|
||||
, <<"name">> := ?BRIDGE_NAME
|
||||
, <<"status">> := _
|
||||
, <<"node_status">> := [_|_]
|
||||
|
@ -193,8 +193,7 @@ t_http_crud_apis(_) ->
|
|||
|
||||
%% list all bridges again, assert Bridge2 is in it
|
||||
{ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []),
|
||||
?assertMatch([#{ <<"id">> := BridgeID
|
||||
, <<"type">> := ?BRIDGE_TYPE
|
||||
?assertMatch([#{ <<"type">> := ?BRIDGE_TYPE
|
||||
, <<"name">> := ?BRIDGE_NAME
|
||||
, <<"status">> := _
|
||||
, <<"node_status">> := [_|_]
|
||||
|
@ -205,8 +204,7 @@ t_http_crud_apis(_) ->
|
|||
|
||||
%% get the bridge by id
|
||||
{ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{ <<"id">> := BridgeID
|
||||
, <<"type">> := ?BRIDGE_TYPE
|
||||
?assertMatch(#{ <<"type">> := ?BRIDGE_TYPE
|
||||
, <<"name">> := ?BRIDGE_NAME
|
||||
, <<"status">> := _
|
||||
, <<"node_status">> := [_|_]
|
||||
|
@ -251,8 +249,7 @@ t_start_stop_bridges(_) ->
|
|||
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
|
||||
%ct:pal("the bridge ==== ~p", [Bridge]),
|
||||
#{ <<"id">> := BridgeID
|
||||
, <<"type">> := ?BRIDGE_TYPE
|
||||
#{ <<"type">> := ?BRIDGE_TYPE
|
||||
, <<"name">> := ?BRIDGE_NAME
|
||||
, <<"status">> := _
|
||||
, <<"node_status">> := [_|_]
|
||||
|
@ -260,31 +257,28 @@ t_start_stop_bridges(_) ->
|
|||
, <<"node_metrics">> := [_|_]
|
||||
, <<"url">> := URL1
|
||||
} = jsx:decode(Bridge),
|
||||
BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
|
||||
%% stop it
|
||||
{ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{ <<"id">> := BridgeID
|
||||
, <<"status">> := <<"disconnected">>
|
||||
?assertMatch(#{ <<"status">> := <<"disconnected">>
|
||||
}, jsx:decode(Bridge2)),
|
||||
%% start again
|
||||
{ok, 200, <<>>} = request(post, operation_path(start, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{ <<"id">> := BridgeID
|
||||
, <<"status">> := <<"connected">>
|
||||
?assertMatch(#{ <<"status">> := <<"connected">>
|
||||
}, jsx:decode(Bridge3)),
|
||||
%% restart an already started bridge
|
||||
{ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{ <<"id">> := BridgeID
|
||||
, <<"status">> := <<"connected">>
|
||||
?assertMatch(#{ <<"status">> := <<"connected">>
|
||||
}, jsx:decode(Bridge3)),
|
||||
%% stop it again
|
||||
{ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
|
||||
%% restart a stopped bridge
|
||||
{ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{ <<"id">> := BridgeID
|
||||
, <<"status">> := <<"connected">>
|
||||
?assertMatch(#{ <<"status">> := <<"connected">>
|
||||
}, jsx:decode(Bridge4)),
|
||||
%% delete the bridge
|
||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
|
||||
|
|
|
@ -40,16 +40,15 @@ config_key_path() ->
|
|||
-dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]).
|
||||
post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
|
||||
ConnId = connector_id(Type, Name),
|
||||
try foreach_linked_bridges(ConnId, fun(#{id := BId}) ->
|
||||
throw({dependency_bridges_exist, BId})
|
||||
try foreach_linked_bridges(ConnId, fun(#{type := BType, name := BName}) ->
|
||||
throw({dependency_bridges_exist, emqx_bridge:bridge_id(BType, BName)})
|
||||
end)
|
||||
catch throw:Error -> {error, Error}
|
||||
end;
|
||||
post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
|
||||
ConnId = connector_id(Type, Name),
|
||||
foreach_linked_bridges(ConnId,
|
||||
fun(#{id := BId}) ->
|
||||
{BType, BName} = emqx_bridge:parse_bridge_id(BId),
|
||||
fun(#{type := BType, name := BName}) ->
|
||||
BridgeConf = emqx:get_config([bridges, BType, BName]),
|
||||
case emqx_bridge:update(BType, BName, {BridgeConf#{connector => OldConf},
|
||||
BridgeConf#{connector => NewConf}}) of
|
||||
|
@ -72,7 +71,7 @@ parse_connector_id(ConnectorId) ->
|
|||
list() ->
|
||||
lists:foldl(fun({Type, NameAndConf}, Connectors) ->
|
||||
lists:foldl(fun({Name, RawConf}, Acc) ->
|
||||
[RawConf#{<<"id">> => connector_id(Type, Name)} | Acc]
|
||||
[RawConf#{<<"type">> => Type, <<"name">> => Name} | Acc]
|
||||
end, Connectors, maps:to_list(NameAndConf))
|
||||
end, [], maps:to_list(emqx:get_raw_config(config_key_path(), #{}))).
|
||||
|
||||
|
@ -81,10 +80,9 @@ lookup(Id) when is_binary(Id) ->
|
|||
lookup(Type, Name).
|
||||
|
||||
lookup(Type, Name) ->
|
||||
Id = connector_id(Type, Name),
|
||||
case emqx:get_raw_config(config_key_path() ++ [Type, Name], not_found) of
|
||||
not_found -> {error, not_found};
|
||||
Conf -> {ok, Conf#{<<"id">> => Id}}
|
||||
Conf -> {ok, Conf#{<<"type">> => Type, <<"name">> => Name}}
|
||||
end.
|
||||
|
||||
create_dry_run(Type, Conf) ->
|
||||
|
|
|
@ -85,15 +85,7 @@ info_example(Type, Method) ->
|
|||
maps:merge(info_example_basic(Type),
|
||||
method_example(Type, Method)).
|
||||
|
||||
method_example(Type, get) ->
|
||||
SType = atom_to_list(Type),
|
||||
SName = "my_" ++ SType ++ "_connector",
|
||||
#{
|
||||
id => bin(SType ++ ":" ++ SName),
|
||||
type => bin(SType),
|
||||
name => bin(SName)
|
||||
};
|
||||
method_example(Type, post) ->
|
||||
method_example(Type, Method) when Method == get; Method == post ->
|
||||
SType = atom_to_list(Type),
|
||||
SName = "my_" ++ SType ++ "_connector",
|
||||
#{
|
||||
|
@ -122,17 +114,21 @@ info_example_basic(mqtt) ->
|
|||
}.
|
||||
|
||||
param_path_id() ->
|
||||
[{id, mk(binary(), #{in => path, example => <<"mqtt:my_mqtt_connector">>})}].
|
||||
[{id, mk(binary(),
|
||||
#{ in => path
|
||||
, example => <<"mqtt:my_mqtt_connector">>
|
||||
, desc => <<"The connector Id. Must be of format {type}:{name}">>
|
||||
})}].
|
||||
|
||||
schema("/connectors_test") ->
|
||||
#{
|
||||
operationId => '/connectors_test',
|
||||
'operationId' => '/connectors_test',
|
||||
post => #{
|
||||
tags => [<<"connectors">>],
|
||||
description => <<"Test creating a new connector by given Id <br>"
|
||||
"The ID must be of format '{type}:{name}'">>,
|
||||
summary => <<"Test creating connector">>,
|
||||
requestBody => post_request_body_schema(),
|
||||
'requestBody' => post_request_body_schema(),
|
||||
responses => #{
|
||||
204 => <<"Test connector OK">>,
|
||||
400 => error_schema(['TEST_FAILED'], "connector test failed")
|
||||
|
@ -142,7 +138,7 @@ schema("/connectors_test") ->
|
|||
|
||||
schema("/connectors") ->
|
||||
#{
|
||||
operationId => '/connectors',
|
||||
'operationId' => '/connectors',
|
||||
get => #{
|
||||
tags => [<<"connectors">>],
|
||||
description => <<"List all connectors">>,
|
||||
|
@ -157,7 +153,7 @@ schema("/connectors") ->
|
|||
tags => [<<"connectors">>],
|
||||
description => <<"Create a new connector">>,
|
||||
summary => <<"Create connector">>,
|
||||
requestBody => post_request_body_schema(),
|
||||
'requestBody' => post_request_body_schema(),
|
||||
responses => #{
|
||||
201 => get_response_body_schema(),
|
||||
400 => error_schema(['ALREADY_EXISTS'], "connector already exists")
|
||||
|
@ -167,7 +163,7 @@ schema("/connectors") ->
|
|||
|
||||
schema("/connectors/:id") ->
|
||||
#{
|
||||
operationId => '/connectors/:id',
|
||||
'operationId' => '/connectors/:id',
|
||||
get => #{
|
||||
tags => [<<"connectors">>],
|
||||
description => <<"Get the connector by Id">>,
|
||||
|
@ -183,7 +179,7 @@ schema("/connectors/:id") ->
|
|||
description => <<"Update an existing connector by Id">>,
|
||||
summary => <<"Update connector">>,
|
||||
parameters => param_path_id(),
|
||||
requestBody => put_request_body_schema(),
|
||||
'requestBody' => put_request_body_schema(),
|
||||
responses => #{
|
||||
200 => get_response_body_schema(),
|
||||
404 => error_schema(['NOT_FOUND'], "Connector not found")
|
||||
|
@ -211,8 +207,7 @@ schema("/connectors/:id") ->
|
|||
'/connectors'(get, _Request) ->
|
||||
{200, [format_resp(Conn) || Conn <- emqx_connector:list()]};
|
||||
|
||||
'/connectors'(post, #{body := #{<<"type">> := ConnType} = Params}) ->
|
||||
ConnName = emqx_misc:gen_id(),
|
||||
'/connectors'(post, #{body := #{<<"type">> := ConnType, <<"name">> := ConnName} = Params}) ->
|
||||
case emqx_connector:lookup(ConnType, ConnName) of
|
||||
{ok, _} ->
|
||||
{400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
|
||||
|
@ -220,8 +215,8 @@ schema("/connectors/:id") ->
|
|||
case emqx_connector:update(ConnType, ConnName,
|
||||
filter_out_request_body(Params)) of
|
||||
{ok, #{raw_config := RawConf}} ->
|
||||
Id = emqx_connector:connector_id(ConnType, ConnName),
|
||||
{201, format_resp(Id, RawConf)};
|
||||
{201, format_resp(RawConf#{<<"type">> => ConnType,
|
||||
<<"name">> => ConnName})};
|
||||
{error, Error} ->
|
||||
{400, error_msg('ALREADY_EXISTS', Error)}
|
||||
end
|
||||
|
@ -231,7 +226,7 @@ schema("/connectors/:id") ->
|
|||
?TRY_PARSE_ID(Id,
|
||||
case emqx_connector:lookup(ConnType, ConnName) of
|
||||
{ok, Conf} ->
|
||||
{200, format_resp(Id, Conf)};
|
||||
{200, format_resp(Conf)};
|
||||
{error, not_found} ->
|
||||
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
|
||||
end);
|
||||
|
@ -243,7 +238,8 @@ schema("/connectors/:id") ->
|
|||
{ok, _} ->
|
||||
case emqx_connector:update(ConnType, ConnName, Params) of
|
||||
{ok, #{raw_config := RawConf}} ->
|
||||
{200, format_resp(Id, RawConf)};
|
||||
{200, format_resp(RawConf#{<<"type">> => ConnType,
|
||||
<<"name">> => ConnName})};
|
||||
{error, Error} ->
|
||||
{500, error_msg('INTERNAL_ERROR', Error)}
|
||||
end;
|
||||
|
@ -274,21 +270,17 @@ error_msg(Code, Msg) when is_binary(Msg) ->
|
|||
error_msg(Code, Msg) ->
|
||||
#{code => Code, message => bin(io_lib:format("~p", [Msg]))}.
|
||||
|
||||
format_resp(#{<<"id">> := Id} = RawConf) ->
|
||||
format_resp(Id, RawConf).
|
||||
|
||||
format_resp(ConnId, RawConf) ->
|
||||
NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)),
|
||||
{Type, ConnName} = emqx_connector:parse_connector_id(ConnId),
|
||||
format_resp(#{<<"type">> := ConnType, <<"name">> := ConnName} = RawConf) ->
|
||||
NumOfBridges = length(emqx_bridge:list_bridges_by_connector(
|
||||
emqx_connector:connector_id(ConnType, ConnName))),
|
||||
RawConf#{
|
||||
<<"id">> => ConnId,
|
||||
<<"type">> => Type,
|
||||
<<"name">> => maps:get(<<"name">>, RawConf, ConnName),
|
||||
<<"type">> => ConnType,
|
||||
<<"name">> => ConnName,
|
||||
<<"num_of_bridges">> => NumOfBridges
|
||||
}.
|
||||
|
||||
filter_out_request_body(Conf) ->
|
||||
ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>],
|
||||
ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>, <<"name">>],
|
||||
maps:without(ExtraConfs, Conf).
|
||||
|
||||
bin(S) when is_list(S) ->
|
||||
|
|
|
@ -54,11 +54,7 @@ fields("config") ->
|
|||
emqx_connector_mqtt_schema:fields("config");
|
||||
|
||||
fields("get") ->
|
||||
[ {id, mk(binary(),
|
||||
#{ desc => "The connector Id"
|
||||
, example => <<"mqtt:my_mqtt_connector">>
|
||||
})}
|
||||
, {num_of_bridges, mk(integer(),
|
||||
[ {num_of_bridges, mk(integer(),
|
||||
#{ desc => "The current number of bridges that are using this connector"
|
||||
})}
|
||||
] ++ fields("post");
|
||||
|
@ -67,7 +63,14 @@ fields("put") ->
|
|||
emqx_connector_mqtt_schema:fields("connector");
|
||||
|
||||
fields("post") ->
|
||||
[ {type, mk(mqtt, #{desc => "The Connector Type"})}
|
||||
[ {type, mk(mqtt,
|
||||
#{ required => true
|
||||
, desc => "The Connector Type"
|
||||
})}
|
||||
, {name, mk(binary(),
|
||||
#{ required => true
|
||||
, desc => "Connector name, used as a human-readable description of the connector."
|
||||
})}
|
||||
] ++ fields("put").
|
||||
|
||||
%% ===================================================================
|
||||
|
|
|
@ -55,11 +55,6 @@ clientid conflicts between different nodes. And we can only use shared subscript
|
|||
topic filters for 'remote_topic' of ingress connections.
|
||||
"""
|
||||
})}
|
||||
, {name,
|
||||
sc(binary(),
|
||||
#{ required => false
|
||||
, desc => "Connector name, used as a human-readable description of the connector."
|
||||
})}
|
||||
, {server,
|
||||
sc(emqx_schema:ip_port(),
|
||||
#{ default => "127.0.0.1:1883"
|
||||
|
|
|
@ -97,8 +97,8 @@ init_per_suite(Config) ->
|
|||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_common_test_helpers:stop_apps([emqx_rule_engine,
|
||||
emqx_connector, emqx_bridge, emqx_dashboard]),
|
||||
emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_connector, emqx_bridge,
|
||||
emqx_dashboard]),
|
||||
ok.
|
||||
|
||||
set_special_configs(emqx_dashboard) ->
|
||||
|
@ -132,11 +132,11 @@ clear_resources() ->
|
|||
lists:foreach(fun(#{id := Id}) ->
|
||||
ok = emqx_rule_engine:delete_rule(Id)
|
||||
end, emqx_rule_engine:get_rules()),
|
||||
lists:foreach(fun(#{id := Id}) ->
|
||||
ok = emqx_bridge:remove(Id)
|
||||
lists:foreach(fun(#{type := Type, name := Name}) ->
|
||||
ok = emqx_bridge:remove(Type, Name)
|
||||
end, emqx_bridge:list()),
|
||||
lists:foreach(fun(#{<<"id">> := Id}) ->
|
||||
ok = emqx_connector:delete(Id)
|
||||
lists:foreach(fun(#{type := Type, name := Name}) ->
|
||||
ok = emqx_connector:delete(Type, Name)
|
||||
end, emqx_connector:list()).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -155,8 +155,7 @@ t_mqtt_crud_apis(_) ->
|
|||
, <<"name">> => ?CONNECTR_NAME
|
||||
}),
|
||||
|
||||
#{ <<"id">> := ConnctorID
|
||||
, <<"type">> := ?CONNECTR_TYPE
|
||||
#{ <<"type">> := ?CONNECTR_TYPE
|
||||
, <<"name">> := ?CONNECTR_NAME
|
||||
, <<"server">> := <<"127.0.0.1:1883">>
|
||||
, <<"username">> := User1
|
||||
|
@ -164,12 +163,13 @@ t_mqtt_crud_apis(_) ->
|
|||
, <<"proto_ver">> := <<"v4">>
|
||||
, <<"ssl">> := #{<<"enable">> := false}
|
||||
} = jsx:decode(Connector),
|
||||
|
||||
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
|
||||
%% update the request-path of the connector
|
||||
User2 = <<"user2">>,
|
||||
{ok, 200, Connector2} = request(put, uri(["connectors", ConnctorID]),
|
||||
?MQTT_CONNECTOR(User2)),
|
||||
?assertMatch(#{ <<"id">> := ConnctorID
|
||||
?assertMatch(#{ <<"type">> := ?CONNECTR_TYPE
|
||||
, <<"name">> := ?CONNECTR_NAME
|
||||
, <<"server">> := <<"127.0.0.1:1883">>
|
||||
, <<"username">> := User2
|
||||
, <<"password">> := <<"">>
|
||||
|
@ -179,8 +179,7 @@ t_mqtt_crud_apis(_) ->
|
|||
|
||||
%% list all connectors again, assert Connector2 is in it
|
||||
{ok, 200, Connector2Str} = request(get, uri(["connectors"]), []),
|
||||
?assertMatch([#{ <<"id">> := ConnctorID
|
||||
, <<"type">> := ?CONNECTR_TYPE
|
||||
?assertMatch([#{ <<"type">> := ?CONNECTR_TYPE
|
||||
, <<"name">> := ?CONNECTR_NAME
|
||||
, <<"server">> := <<"127.0.0.1:1883">>
|
||||
, <<"username">> := User2
|
||||
|
@ -191,8 +190,7 @@ t_mqtt_crud_apis(_) ->
|
|||
|
||||
%% get the connector by id
|
||||
{ok, 200, Connector3Str} = request(get, uri(["connectors", ConnctorID]), []),
|
||||
?assertMatch(#{ <<"id">> := ConnctorID
|
||||
, <<"type">> := ?CONNECTR_TYPE
|
||||
?assertMatch(#{ <<"type">> := ?CONNECTR_TYPE
|
||||
, <<"name">> := ?CONNECTR_NAME
|
||||
, <<"server">> := <<"127.0.0.1:1883">>
|
||||
, <<"username">> := User2
|
||||
|
@ -222,7 +220,8 @@ t_mqtt_conn_bridge_ingress(_) ->
|
|||
, <<"name">> => ?CONNECTR_NAME
|
||||
}),
|
||||
|
||||
#{ <<"id">> := ConnctorID
|
||||
#{ <<"type">> := ?CONNECTR_TYPE
|
||||
, <<"name">> := ?CONNECTR_NAME
|
||||
, <<"server">> := <<"127.0.0.1:1883">>
|
||||
, <<"num_of_bridges">> := 0
|
||||
, <<"username">> := User1
|
||||
|
@ -230,7 +229,7 @@ t_mqtt_conn_bridge_ingress(_) ->
|
|||
, <<"proto_ver">> := <<"v4">>
|
||||
, <<"ssl">> := #{<<"enable">> := false}
|
||||
} = jsx:decode(Connector),
|
||||
|
||||
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
|
||||
%% ... and a MQTT bridge, using POST
|
||||
%% we bind this bridge to the connector created just now
|
||||
timer:sleep(50),
|
||||
|
@ -239,10 +238,11 @@ t_mqtt_conn_bridge_ingress(_) ->
|
|||
<<"type">> => ?CONNECTR_TYPE,
|
||||
<<"name">> => ?BRIDGE_NAME_INGRESS
|
||||
}),
|
||||
#{ <<"id">> := BridgeIDIngress
|
||||
, <<"type">> := <<"mqtt">>
|
||||
#{ <<"type">> := ?CONNECTR_TYPE
|
||||
, <<"name">> := ?BRIDGE_NAME_INGRESS
|
||||
, <<"connector">> := ConnctorID
|
||||
} = jsx:decode(Bridge),
|
||||
BridgeIDIngress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_INGRESS),
|
||||
wait_for_resource_ready(BridgeIDIngress, 5),
|
||||
|
||||
%% we now test if the bridge works as expected
|
||||
|
@ -269,8 +269,7 @@ t_mqtt_conn_bridge_ingress(_) ->
|
|||
|
||||
%% get the connector by id, verify the num_of_bridges now is 1
|
||||
{ok, 200, Connector1Str} = request(get, uri(["connectors", ConnctorID]), []),
|
||||
?assertMatch(#{ <<"id">> := ConnctorID
|
||||
, <<"num_of_bridges">> := 1
|
||||
?assertMatch(#{ <<"num_of_bridges">> := 1
|
||||
}, jsx:decode(Connector1Str)),
|
||||
|
||||
%% delete the bridge
|
||||
|
@ -291,14 +290,13 @@ t_mqtt_conn_bridge_egress(_) ->
|
|||
}),
|
||||
|
||||
%ct:pal("---connector: ~p", [Connector]),
|
||||
#{ <<"id">> := ConnctorID
|
||||
, <<"server">> := <<"127.0.0.1:1883">>
|
||||
#{ <<"server">> := <<"127.0.0.1:1883">>
|
||||
, <<"username">> := User1
|
||||
, <<"password">> := <<"">>
|
||||
, <<"proto_ver">> := <<"v4">>
|
||||
, <<"ssl">> := #{<<"enable">> := false}
|
||||
} = jsx:decode(Connector),
|
||||
|
||||
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
|
||||
%% ... and a MQTT bridge, using POST
|
||||
%% we bind this bridge to the connector created just now
|
||||
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||
|
@ -306,11 +304,11 @@ t_mqtt_conn_bridge_egress(_) ->
|
|||
<<"type">> => ?CONNECTR_TYPE,
|
||||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||
}),
|
||||
#{ <<"id">> := BridgeIDEgress
|
||||
, <<"type">> := ?CONNECTR_TYPE
|
||||
#{ <<"type">> := ?CONNECTR_TYPE
|
||||
, <<"name">> := ?BRIDGE_NAME_EGRESS
|
||||
, <<"connector">> := ConnctorID
|
||||
} = jsx:decode(Bridge),
|
||||
BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
|
||||
wait_for_resource_ready(BridgeIDEgress, 5),
|
||||
|
||||
%% we now test if the bridge works as expected
|
||||
|
@ -338,8 +336,7 @@ t_mqtt_conn_bridge_egress(_) ->
|
|||
|
||||
%% verify the metrics of the bridge
|
||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
||||
?assertMatch(#{ <<"id">> := BridgeIDEgress
|
||||
, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)
|
||||
?assertMatch(#{ <<"metrics">> := ?metrics(1, 1, 0, _, _, _)
|
||||
, <<"node_metrics">> :=
|
||||
[#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
|
||||
}, jsx:decode(BridgeStr)),
|
||||
|
@ -365,10 +362,9 @@ t_mqtt_conn_update(_) ->
|
|||
}),
|
||||
|
||||
%ct:pal("---connector: ~p", [Connector]),
|
||||
#{ <<"id">> := ConnctorID
|
||||
, <<"server">> := <<"127.0.0.1:1883">>
|
||||
#{ <<"server">> := <<"127.0.0.1:1883">>
|
||||
} = jsx:decode(Connector),
|
||||
|
||||
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
|
||||
%% ... and a MQTT bridge, using POST
|
||||
%% we bind this bridge to the connector created just now
|
||||
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||
|
@ -376,11 +372,11 @@ t_mqtt_conn_update(_) ->
|
|||
<<"type">> => ?CONNECTR_TYPE,
|
||||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||
}),
|
||||
#{ <<"id">> := BridgeIDEgress
|
||||
, <<"type">> := <<"mqtt">>
|
||||
#{ <<"type">> := ?CONNECTR_TYPE
|
||||
, <<"name">> := ?BRIDGE_NAME_EGRESS
|
||||
, <<"connector">> := ConnctorID
|
||||
} = jsx:decode(Bridge),
|
||||
BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
|
||||
wait_for_resource_ready(BridgeIDEgress, 5),
|
||||
|
||||
%% then we try to update 'server' of the connector, to an unavailable IP address
|
||||
|
@ -407,10 +403,9 @@ t_mqtt_conn_update2(_) ->
|
|||
, <<"name">> => ?CONNECTR_NAME
|
||||
}),
|
||||
|
||||
#{ <<"id">> := ConnctorID
|
||||
, <<"server">> := <<"127.0.0.1:2603">>
|
||||
#{ <<"server">> := <<"127.0.0.1:2603">>
|
||||
} = jsx:decode(Connector),
|
||||
|
||||
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
|
||||
%% ... and a MQTT bridge, using POST
|
||||
%% we bind this bridge to the connector created just now
|
||||
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||
|
@ -418,12 +413,12 @@ t_mqtt_conn_update2(_) ->
|
|||
<<"type">> => ?CONNECTR_TYPE,
|
||||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||
}),
|
||||
#{ <<"id">> := BridgeIDEgress
|
||||
, <<"type">> := <<"mqtt">>
|
||||
#{ <<"type">> := ?CONNECTR_TYPE
|
||||
, <<"name">> := ?BRIDGE_NAME_EGRESS
|
||||
, <<"status">> := <<"disconnected">>
|
||||
, <<"connector">> := ConnctorID
|
||||
} = jsx:decode(Bridge),
|
||||
BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
|
||||
%% We try to fix the 'server' parameter, to another unavailable server..
|
||||
%% The update should success: we don't check the connectivity of the new config
|
||||
%% if the resource is now disconnected.
|
||||
|
@ -434,8 +429,7 @@ t_mqtt_conn_update2(_) ->
|
|||
?MQTT_CONNECTOR2(<<"127.0.0.1:1883">>)),
|
||||
wait_for_resource_ready(BridgeIDEgress, 5),
|
||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
||||
?assertMatch(#{ <<"id">> := BridgeIDEgress
|
||||
, <<"status">> := <<"connected">>
|
||||
?assertMatch(#{ <<"status">> := <<"connected">>
|
||||
}, jsx:decode(BridgeStr)),
|
||||
%% delete the bridge
|
||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
||||
|
@ -447,13 +441,12 @@ t_mqtt_conn_update2(_) ->
|
|||
|
||||
t_mqtt_conn_update3(_) ->
|
||||
%% we add a mqtt connector, using POST
|
||||
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
||||
{ok, 201, _} = request(post, uri(["connectors"]),
|
||||
?MQTT_CONNECTOR2(<<"127.0.0.1:1883">>)
|
||||
#{ <<"type">> => ?CONNECTR_TYPE
|
||||
, <<"name">> => ?CONNECTR_NAME
|
||||
}),
|
||||
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
|
||||
|
||||
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
|
||||
%% ... and a MQTT bridge, using POST
|
||||
%% we bind this bridge to the connector created just now
|
||||
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||
|
@ -461,9 +454,9 @@ t_mqtt_conn_update3(_) ->
|
|||
<<"type">> => ?CONNECTR_TYPE,
|
||||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||
}),
|
||||
#{ <<"id">> := BridgeIDEgress
|
||||
, <<"connector">> := ConnctorID
|
||||
#{ <<"connector">> := ConnctorID
|
||||
} = jsx:decode(Bridge),
|
||||
BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
|
||||
wait_for_resource_ready(BridgeIDEgress, 5),
|
||||
|
||||
%% delete the connector should fail because it is in use by a bridge
|
||||
|
@ -488,18 +481,18 @@ t_mqtt_conn_testing(_) ->
|
|||
}).
|
||||
|
||||
t_ingress_mqtt_bridge_with_rules(_) ->
|
||||
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
||||
{ok, 201, _} = request(post, uri(["connectors"]),
|
||||
?MQTT_CONNECTOR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE
|
||||
, <<"name">> => ?CONNECTR_NAME
|
||||
}),
|
||||
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
|
||||
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
|
||||
|
||||
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||
{ok, 201, _} = request(post, uri(["bridges"]),
|
||||
?MQTT_BRIDGE_INGRESS(ConnctorID)#{
|
||||
<<"type">> => ?CONNECTR_TYPE,
|
||||
<<"name">> => ?BRIDGE_NAME_INGRESS
|
||||
}),
|
||||
#{ <<"id">> := BridgeIDIngress } = jsx:decode(Bridge),
|
||||
BridgeIDIngress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_INGRESS),
|
||||
|
||||
{ok, 201, Rule} = request(post, uri(["rules"]),
|
||||
#{<<"name">> => <<"A rule get messages from a source mqtt bridge">>,
|
||||
|
@ -569,18 +562,18 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
|||
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
|
||||
|
||||
t_egress_mqtt_bridge_with_rules(_) ->
|
||||
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
||||
{ok, 201, _} = request(post, uri(["connectors"]),
|
||||
?MQTT_CONNECTOR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE
|
||||
, <<"name">> => ?CONNECTR_NAME
|
||||
}),
|
||||
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
|
||||
|
||||
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
|
||||
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
|
||||
<<"type">> => ?CONNECTR_TYPE,
|
||||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||
}),
|
||||
#{ <<"id">> := BridgeIDEgress } = jsx:decode(Bridge),
|
||||
#{ <<"type">> := ?CONNECTR_TYPE, <<"name">> := ?BRIDGE_NAME_EGRESS } = jsx:decode(Bridge),
|
||||
BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
|
||||
|
||||
{ok, 201, Rule} = request(post, uri(["rules"]),
|
||||
#{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>,
|
||||
|
@ -655,8 +648,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
|||
|
||||
%% verify the metrics of the bridge
|
||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
||||
?assertMatch(#{ <<"id">> := BridgeIDEgress
|
||||
, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)
|
||||
?assertMatch(#{ <<"metrics">> := ?metrics(2, 2, 0, _, _, _)
|
||||
, <<"node_metrics">> :=
|
||||
[#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}]
|
||||
}, jsx:decode(BridgeStr)),
|
||||
|
|
|
@ -68,7 +68,7 @@ schema("/banned") ->
|
|||
responses => #{
|
||||
200 => [{data, hoconsc:mk(hoconsc:array(hoconsc:ref(ban)), #{})}],
|
||||
400 => emqx_dashboard_swagger:error_codes(
|
||||
['ALREADY_EXISTED', 'BAD_REQUEST'],
|
||||
['ALREADY_EXISTS', 'BAD_REQUEST'],
|
||||
<<"Banned already existed, or bad args">>)
|
||||
}
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ banned(post, #{body := Body}) ->
|
|||
case emqx_banned:create(Ban) of
|
||||
{ok, Banned} -> {200, format(Banned)};
|
||||
{error, {already_exist, Old}} ->
|
||||
{400, 'ALREADY_EXISTED', format(Old)}
|
||||
{400, 'ALREADY_EXISTS', format(Old)}
|
||||
end
|
||||
end.
|
||||
|
||||
|
|
|
@ -264,7 +264,7 @@ trace(post, #{body := Param}) ->
|
|||
{ok, Trace0} -> {200, format_trace(Trace0)};
|
||||
{error, {already_existed, Name}} ->
|
||||
{400, #{
|
||||
code => 'ALREADY_EXISTED',
|
||||
code => 'ALREADY_EXISTS',
|
||||
message => ?TO_BIN([Name, " Already Exists"])
|
||||
}};
|
||||
{error, {duplicate_condition, Name}} ->
|
||||
|
@ -343,7 +343,8 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) ->
|
|||
_ -> Acc
|
||||
end;
|
||||
{error, Node, Reason} ->
|
||||
?SLOG(error, #{msg => "download_trace_log_error", node => Node, log => TraceLog, reason => Reason}),
|
||||
?SLOG(error, #{msg => "download_trace_log_error", node => Node,
|
||||
log => TraceLog, reason => Reason}),
|
||||
Acc
|
||||
end
|
||||
end, [], TraceFiles).
|
||||
|
@ -375,7 +376,8 @@ stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
|
|||
{200, #{meta => Meta, items => <<"">>}};
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "read_file_failed",
|
||||
node => Node, name => Name, reason => Reason, position => Position, bytes => Bytes}),
|
||||
node => Node, name => Name, reason => Reason,
|
||||
position => Position, bytes => Bytes}),
|
||||
{400, #{code => 'READ_FILE_ERROR', message => Reason}};
|
||||
{badrpc, nodedown} ->
|
||||
{400, #{code => 'RPC_ERROR', message => "BadRpc node down"}}
|
||||
|
|
|
@ -9,6 +9,8 @@
|
|||
filter => "*.erl",
|
||||
ruleset => erl_files,
|
||||
rules => [
|
||||
{elvis_style, macro_names, disable},
|
||||
{elvis_style, function_naming_convention, disable},
|
||||
{elvis_style, state_record_and_type, disable},
|
||||
{elvis_style, no_common_caveats_call, #{}},
|
||||
{elvis_style, no_debug_call, #{ debug_functions => [ {ct, pal}
|
||||
|
@ -19,15 +21,15 @@
|
|||
{right, "||"},
|
||||
{left, "||"}]}},
|
||||
{elvis_style, dont_repeat_yourself, #{ min_complexity => 20 }},
|
||||
{elvis_style, god_modules, #{ignore => [emqx_authentication,
|
||||
emqx_resource]}}
|
||||
{elvis_style, god_modules, #{limit => 100}}
|
||||
]
|
||||
},
|
||||
#{dirs => ["test", "apps/**/test"],
|
||||
filter => "*.erl",
|
||||
rules => [
|
||||
{elvis_text_style, line_length, #{ limit => 100
|
||||
, skip_comments => false }},
|
||||
, skip_comments => false
|
||||
}},
|
||||
{elvis_style, dont_repeat_yourself, #{ min_complexity => 100 }}
|
||||
]
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue