Merge pull request #6387 from terry-xiaoyu/improve_connector_apis

Improve bridge and connector APIs
This commit is contained in:
Shawn 2021-12-09 10:54:55 +08:00 committed by GitHub
commit f7941cd2bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 164 additions and 100 deletions

View File

@ -4,6 +4,7 @@
## MQTT bridges to/from another MQTT broker
#bridges.mqtt.my_ingress_mqtt_bridge {
# enable = true
# connector = "mqtt:my_mqtt_connector"
# direction = ingress
# ## topic mappings for this bridge
@ -16,6 +17,7 @@
#}
#
#bridges.mqtt.my_egress_mqtt_bridge {
# enable = true
# connector = "mqtt:my_mqtt_connector"
# direction = egress
# ## topic mappings for this bridge
@ -28,6 +30,7 @@
#
## HTTP bridges to an HTTP server
#bridges.http.my_http_bridge {
# enable = true
# ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url
# url = "http://localhost:9901/messages/${topic}"
# request_timeout = "30s"

View File

@ -18,7 +18,8 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-export([post_config_update/5]).
-export([ post_config_update/5
]).
-export([ load_hook/0
, unload_hook/0
@ -37,6 +38,7 @@
, lookup/2
, lookup/3
, list/0
, list_bridges_by_connector/1
, create/3
, recreate/2
, recreate/3
@ -102,14 +104,13 @@ bridge_type(emqx_connector_http) -> http.
post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated}
= diff_confs(NewConf, OldConf),
Result = perform_bridge_changes([
_ = perform_bridge_changes([
{fun remove/3, Removed},
{fun create/3, Added},
{fun update/3, Updated}
]),
ok = unload_hook(),
ok = load_hook(NewConf),
Result.
ok = load_hook(NewConf).
perform_bridge_changes(Tasks) ->
perform_bridge_changes(Tasks, ok).
@ -160,6 +161,10 @@ list() ->
end, Bridges, maps:to_list(NameAndConf))
end, [], maps:to_list(emqx:get_raw_config([bridges], #{}))).
list_bridges_by_connector(ConnectorId) ->
[B || B = #{raw_config := #{<<"connector">> := Id}} <- list(),
ConnectorId =:= Id].
lookup(Type, Name) ->
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
lookup(Type, Name, RawConf).
@ -182,15 +187,11 @@ restart(Type, Name) ->
create(Type, Name, Conf) ->
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}),
ResId = resource_id(Type, Name),
case emqx_resource:create_local(ResId,
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf)) of
{ok, already_created} ->
emqx_resource:get_instance(ResId);
{ok, Data} ->
{ok, Data};
{error, Reason} ->
{error, Reason}
case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf)) of
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
{error, Reason} -> {error, Reason}
end.
update(Type, Name, {_OldConf, Conf}) ->
@ -204,7 +205,10 @@ update(Type, Name, {_OldConf, Conf}) ->
%%
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
config => Conf}),
recreate(Type, Name, Conf).
case recreate(Type, Name, Conf) of
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
{error, _} = Err -> Err
end.
recreate(Type, Name) ->
recreate(Type, Name, emqx:get_raw_config([bridges, Type, Name])).
@ -321,6 +325,11 @@ parse_url(Url) ->
error({invalid_url, Url})
end.
maybe_disable_bridge(Type, Name, Conf) ->
case maps:get(enable, Conf, true) of
false -> stop(Type, Name);
true -> ok
end.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);

View File

@ -26,7 +26,7 @@
%% API callbacks
-export(['/bridges'/2, '/bridges/:id'/2,
'/nodes/:node/bridges/:id/operation/:operation'/2]).
'/bridges/:id/operation/:operation'/2]).
-export([ list_local_bridges/1
, lookup_from_local_node/2
@ -38,11 +38,12 @@
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_bridge:parse_bridge_id(Id) of
{BridgeType, BridgeName} -> EXPR
{BridgeType, BridgeName} ->
EXPR
catch
error:{invalid_bridge_id, Id0} ->
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
". Bridge Ids must be of format {type}:{name}">>}}
{400, error_msg('INVALID_ID', <<"invalid_bridge_id: ", Id0/binary,
". Bridge Ids must be of format {type}:{name}">>)}
end).
-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
@ -67,7 +68,7 @@ namespace() -> "bridge".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
paths() -> ["/bridges", "/bridges/:id", "/nodes/:node/bridges/:id/operation/:operation"].
paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation"].
error_schema(Code, Message) ->
[ {code, mk(string(), #{example => Code})}
@ -78,9 +79,6 @@ get_response_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(),
bridge_info_examples(get)).
param_path_node() ->
path_param(node, binary(), atom_to_binary(node(), utf8)).
param_path_operation() ->
path_param(operation, enum([start, stop, restart]), <<"start">>).
@ -129,7 +127,10 @@ info_example(Type, Direction, Method) ->
method_example(Type, Direction, get) ->
SType = atom_to_list(Type),
SDir = atom_to_list(Direction),
SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
SName = case Type of
http -> "my_" ++ SType ++ "_bridge";
_ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge"
end,
#{
id => bin(SType ++ ":" ++ SName),
type => bin(SType),
@ -138,7 +139,10 @@ method_example(Type, Direction, get) ->
method_example(Type, Direction, post) ->
SType = atom_to_list(Type),
SDir = atom_to_list(Direction),
SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
SName = case Type of
http -> "my_" ++ SType ++ "_bridge";
_ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge"
end,
#{
type => bin(SType),
name => bin(SName)
@ -247,15 +251,14 @@ schema("/bridges/:id") ->
}
};
schema("/nodes/:node/bridges/:id/operation/:operation") ->
schema("/bridges/:id/operation/:operation") ->
#{
operationId => '/nodes/:node/bridges/:id/operation/:operation',
operationId => '/bridges/:id/operation/:operation',
post => #{
tags => [<<"bridges">>],
summary => <<"Start/Stop/Restart Bridge">>,
description => <<"Start/Stop/Restart bridges on a specific node">>,
parameters => [
param_path_node(),
param_path_id(),
param_path_operation()
],
@ -269,7 +272,8 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) ->
BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()),
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
{ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)};
{error, not_found} ->
case ensure_bridge_created(BridgeType, BridgeName, Conf) of
ok -> lookup_from_all_nodes(BridgeType, BridgeName, 201);
@ -296,7 +300,7 @@ list_local_bridges(Node) ->
{error, Error} -> {400, Error}
end;
{error, not_found} ->
{404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}}
{404, error_msg('NOT_FOUND',<<"bridge not found">>)}
end);
'/bridges/:id'(delete, #{bindings := #{id := Id}}) ->
@ -305,7 +309,7 @@ list_local_bridges(Node) ->
#{override_to => cluster}) of
{ok, _} -> {204};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
{500, error_msg('UNKNOWN_ERROR', Reason)}
end).
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
@ -324,20 +328,25 @@ lookup_from_local_node(BridgeType, BridgeName) ->
Error -> Error
end.
'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings :=
#{node := Node, id := Id, operation := Op}}) ->
OperFun =
fun (<<"start">>) -> start;
(<<"stop">>) -> stop;
(<<"restart">>) -> restart
end,
?TRY_PARSE_ID(Id,
case rpc_call(binary_to_atom(Node, latin1), emqx_bridge, OperFun(Op),
[BridgeType, BridgeName]) of
ok -> {200};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
end).
'/bridges/:id/operation/:operation'(post, #{bindings :=
#{id := Id, operation := Op}}) ->
?TRY_PARSE_ID(Id, case operation_to_conf_req(Op) of
invalid -> {404, error_msg('BAD_ARG', <<"invalid operation">>)};
UpReq ->
case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
UpReq, #{override_to => cluster}) of
{ok, _} -> {200};
{error, {pre_config_update, _, bridge_not_found}} ->
{404, error_msg('NOT_FOUND', <<"bridge not found">>)};
{error, Reason} ->
{500, error_msg('UNKNOWN_ERROR', Reason)}
end
end).
operation_to_conf_req(<<"start">>) -> start;
operation_to_conf_req(<<"stop">>) -> stop;
operation_to_conf_req(<<"restart">>) -> restart;
operation_to_conf_req(_) -> invalid.
ensure_bridge_created(BridgeType, BridgeName, Conf) ->
Conf1 = maps:without([<<"type">>, <<"name">>], Conf),

View File

@ -19,16 +19,35 @@
-export([start/2, stop/1]).
-export([ pre_config_update/3
]).
-define(TOP_LELVE_HDLR_PATH, (emqx_bridge:config_key_path())).
-define(LEAF_NODE_HDLR_PATH, (emqx_bridge:config_key_path() ++ ['?', '?'])).
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_bridge_sup:start_link(),
ok = emqx_bridge:load(),
ok = emqx_bridge:load_hook(),
emqx_config_handler:add_handler(emqx_bridge:config_key_path(), emqx_bridge),
emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE),
emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge),
{ok, Sup}.
stop(_State) ->
emqx_conf:remove_handler(emqx_bridge:config_key_path()),
emqx_conf:remove_handler(?LEAF_NODE_HDLR_PATH),
emqx_conf:remove_handler(?TOP_LELVE_HDLR_PATH),
ok = emqx_bridge:unload_hook(),
ok.
-define(IS_OPER(O), when Oper == start; Oper == stop; Oper == restart).
pre_config_update(_, Oper, undefined) ?IS_OPER(Oper) ->
{error, bridge_not_found};
pre_config_update(_, Oper, OldConfig) ?IS_OPER(Oper) ->
{ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
pre_config_update(_, Conf, _OldConfig) ->
{ok, Conf}.
%% internal functions
operation_to_enable(start) -> true;
operation_to_enable(stop) -> false;
operation_to_enable(restart) -> true.

View File

@ -79,7 +79,13 @@ fields("get") ->
] ++ fields("post").
basic_config() ->
proplists:delete(base_url, emqx_connector_http:fields(config)).
[ {enable,
mk(boolean(),
#{ desc =>"Enable or disable this bridge"
, default => true
})}
]
++ proplists:delete(base_url, emqx_connector_http:fields(config)).
%%======================================================================================
id_field() ->

View File

@ -11,28 +11,30 @@
roots() -> [].
fields("ingress") ->
[ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
, emqx_bridge_schema:connector_name()
] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
[ emqx_bridge_schema:direction_field(ingress, emqx_connector_mqtt_schema:ingress_desc())
]
++ emqx_bridge_schema:common_bridge_fields()
++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
fields("egress") ->
[ direction(egress, emqx_connector_mqtt_schema:egress_desc())
, emqx_bridge_schema:connector_name()
] ++ emqx_connector_mqtt_schema:fields("egress");
[ emqx_bridge_schema:direction_field(egress, emqx_connector_mqtt_schema:egress_desc())
]
++ emqx_bridge_schema:common_bridge_fields()
++ emqx_connector_mqtt_schema:fields("egress");
fields("post_ingress") ->
[ type_field()
, name_field()
] ++ fields("ingress");
] ++ proplists:delete(enable, fields("ingress"));
fields("post_egress") ->
[ type_field()
, name_field()
] ++ fields("egress");
] ++ proplists:delete(enable, fields("egress"));
fields("put_ingress") ->
fields("ingress");
proplists:delete(enable, fields("ingress"));
fields("put_egress") ->
fields("egress");
proplists:delete(enable, fields("egress"));
fields("get_ingress") ->
[ id_field()
@ -42,13 +44,6 @@ fields("get_egress") ->
] ++ fields("post_egress").
%%======================================================================================
direction(Dir, Desc) ->
{direction, mk(Dir,
#{ nullable => false
, desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.<br>"
++ Desc
})}.
id_field() ->
{id, mk(binary(), #{desc => "The Bridge Id", example => "mqtt:my_mqtt_bridge"})}.

View File

@ -11,7 +11,8 @@
, post_request/0
]).
-export([ connector_name/0
-export([ common_bridge_fields/0
, direction_field/2
]).
%%======================================================================================
@ -24,17 +25,6 @@
get_response() ->
http_schema("get").
connector_name() ->
{connector,
mk(binary(),
#{ nullable => false
, desc =>"""
The connector name to be used for this bridge.
Connectors are configured as 'connectors.{type}.{name}',
for example 'connectors.http.mybridge'.
"""
})}.
put_request() ->
http_schema("put").
@ -49,6 +39,30 @@ http_schema(Method) ->
hoconsc:union([ref(emqx_bridge_http_schema, Method)
| Schemas]).
common_bridge_fields() ->
[ {enable,
mk(boolean(),
#{ desc =>"Enable or disable this bridge"
, default => true
})}
, {connector,
mk(binary(),
#{ nullable => false
, desc =>"""
The connector name to be used for this bridge.
Connectors are configured as 'connectors.{type}.{name}',
for example 'connectors.http.mybridge'.
"""
})}
].
direction_field(Dir, Desc) ->
{direction, mk(Dir,
#{ nullable => false
, desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.<br>"
++ Desc
})}.
%%======================================================================================
%% For config files
roots() -> [bridges].

View File

@ -234,37 +234,27 @@ t_start_stop_bridges(_) ->
, <<"url">> := URL1
}, jsx:decode(Bridge)),
%% stop it
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "stop"]),
<<"">>),
{ok, 200, <<>>} = request(post, operation_path(stop), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"disconnected">>
}, jsx:decode(Bridge2)),
%% start again
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "start"]),
<<"">>),
{ok, 200, <<>>} = request(post, operation_path(start), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% restart an already started bridge
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "restart"]),
<<"">>),
{ok, 200, <<>>} = request(post, operation_path(restart), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% stop it again
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "stop"]),
<<"">>),
{ok, 200, <<>>} = request(post, operation_path(stop), <<"">>),
%% restart a stopped bridge
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "restart"]),
<<"">>),
{ok, 200, <<>>} = request(post, operation_path(restart), <<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
@ -306,3 +296,5 @@ auth_header_() ->
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
operation_path(Oper) ->
uri(["bridges", ?BRIDGE_ID, "operation", Oper]).

View File

@ -220,8 +220,8 @@ schema("/connectors/:id") ->
case emqx_connector:update(ConnType, ConnName,
maps:without([<<"type">>, <<"name">>], Params)) of
{ok, #{raw_config := RawConf}} ->
{201, RawConf#{<<"id">> =>
emqx_connector:connector_id(ConnType, ConnName)}};
Id = emqx_connector:connector_id(ConnType, ConnName),
{201, format_resp(Id, RawConf)};
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
end
end.
@ -229,7 +229,7 @@ schema("/connectors/:id") ->
'/connectors/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id,
case emqx_connector:lookup(ConnType, ConnName) of
{ok, Conf} -> {200, Conf#{<<"id">> => Id}};
{ok, Conf} -> {200, format_resp(Id, Conf)};
{error, not_found} ->
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
end);
@ -239,7 +239,8 @@ schema("/connectors/:id") ->
case emqx_connector:lookup(ConnType, ConnName) of
{ok, _} ->
case emqx_connector:update(ConnType, ConnName, Params) of
{ok, #{raw_config := RawConf}} -> {200, RawConf#{<<"id">> => Id}};
{ok, #{raw_config := RawConf}} ->
{200, format_resp(Id, RawConf)};
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
end;
{error, not_found} ->
@ -263,5 +264,12 @@ error_msg(Code, Msg) when is_binary(Msg) ->
error_msg(Code, Msg) ->
#{code => Code, message => bin(io_lib:format("~p", [Msg]))}.
format_resp(ConnId, RawConf) ->
NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)),
RawConf#{
<<"id">> => ConnId,
<<"num_of_bridges">> => NumOfBridges
}.
bin(S) when is_list(S) ->
list_to_binary(S).

View File

@ -54,11 +54,14 @@ fields("config") ->
emqx_connector_mqtt_schema:fields("config");
fields("get") ->
[{id, mk(binary(),
[ {id, mk(binary(),
#{ desc => "The connector Id"
, example => <<"mqtt:my_mqtt_connector">>
})}]
++ fields("post");
})}
, {num_of_bridges, mk(integer(),
#{ desc => "The current number of bridges that are using this connector"
})}
] ++ fields("post");
fields("put") ->
emqx_connector_mqtt_schema:fields("connector");

View File

@ -13,6 +13,7 @@
, post_request/0
]).
%% the config for http bridges do not need connectors
-define(CONN_TYPES, [mqtt]).
%%======================================================================================

View File

@ -199,9 +199,9 @@ t_mqtt_conn_bridge_ingress(_) ->
, <<"name">> => ?CONNECTR_NAME
}),
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"server">> := <<"127.0.0.1:1883">>
, <<"num_of_bridges">> := 0
, <<"username">> := User1
, <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">>
@ -216,7 +216,6 @@ t_mqtt_conn_bridge_ingress(_) ->
<<"name">> => ?BRIDGE_NAME_INGRESS
}),
%ct:pal("---bridge: ~p", [Bridge]),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS
, <<"type">> := <<"mqtt">>
, <<"status">> := <<"connected">>
@ -246,6 +245,12 @@ t_mqtt_conn_bridge_ingress(_) ->
false
end),
%% get the connector by id, verify the num_of_bridges now is 1
{ok, 200, Connector1Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"num_of_bridges">> := 1
}, jsx:decode(Connector1Str)),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_INGRESS]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),