refactor(bridges): add POST /bridges for creating

This commit is contained in:
Shawn 2021-11-16 13:35:29 +08:00
parent c4668b4047
commit 4dac90f4a7
3 changed files with 289 additions and 115 deletions

View File

@ -65,7 +65,8 @@ load_hook() ->
end, maps:to_list(Bridges)).
load_hook(#{from_local_topic := _}) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
ok;
load_hook(_Conf) -> ok.
unload_hook() ->
@ -98,12 +99,13 @@ bridge_type(emqx_connector_http) -> http.
post_config_update(_Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated}
= diff_confs(NewConf, OldConf),
perform_bridge_changes([
Result = perform_bridge_changes([
{fun remove/3, Removed},
{fun create/3, Added},
{fun update/3, Updated}
]),
reload_hook().
ok = reload_hook(),
Result.
perform_bridge_changes(Tasks) ->
perform_bridge_changes(Tasks, ok).

View File

@ -19,23 +19,40 @@
-export([api_spec/0]).
-export([ list_bridges/2
-export([ list_create_bridges_in_cluster/2
, list_local_bridges/1
, crud_bridges_cluster/2
, crud_bridges/3
, crud_bridges_in_cluster/2
, crud_local_bridges/4
, manage_bridges/2
]).
-define(TYPES, [mqtt]).
-define(BRIDGE(N, T, C), #{<<"id">> => N, <<"type">> => T, <<"config">> => C}).
-define(TYPES, [mqtt, http]).
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_bridge:parse_bridge_id(Id) of
{BridgeType, BridgeName} -> EXPR
catch
error:{invalid_bridge_id, Id0} ->
{400, #{code => 102, message => <<"invalid_bridge_id: ", Id0/binary>>}}
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
". Bridge Ids must be of format <bridge_type>:<name>">>}}
end).
-define(METRICS(SUCC, FAILED, RATE, RATE_5, RATE_MAX),
#{
success => SUCC,
failed => FAILED,
rate => RATE,
rate_last5m => RATE_5,
rate_max => RATE_MAX
}).
-define(metrics(SUCC, FAILED, RATE, RATE_5, RATE_MAX),
#{
success := SUCC,
failed := FAILED,
rate := RATE,
rate_last5m := RATE_5,
rate_max := RATE_MAX
}).
req_schema() ->
Schema = [
case maps:to_list(emqx:get_raw_config([bridges, T], #{})) of
@ -47,14 +64,50 @@ req_schema() ->
|| T <- ?TYPES],
#{'oneOf' => Schema}.
node_schema() ->
#{type => string, example => "emqx@127.0.0.1"}.
status_schema() ->
#{type => string, enum => [connected, disconnected]}.
metrics_schema() ->
#{ type => object
, properties => #{
success => #{type => integer, example => "0"},
failed => #{type => integer, example => "0"},
rate => #{type => number, format => float, example => "0.0"},
rate_last5m => #{type => number, format => float, example => "0.0"},
rate_max => #{type => number, format => float, example => "0.0"}
}
}.
per_node_schema(Key, Schema) ->
#{
type => array,
items => #{
type => object,
properties => #{
node => node_schema(),
Key => Schema
}
}
}.
resp_schema() ->
#{'oneOf' := Schema} = req_schema(),
AddMetadata = fun(Prop) ->
Prop#{status => #{type => string, enum => [connected, disconnected, connecting]},
id => #{type => string},
Prop#{status => status_schema(),
node_status => per_node_schema(status, status_schema()),
metrics => metrics_schema(),
node_metrics => per_node_schema(metrics, metrics_schema()),
id => #{type => string, example => "http:my_http_bridge"},
bridge_type => #{type => string, enum => ?TYPES},
node => #{type => string}}
node => node_schema()
}
end,
more_props_resp_schema(AddMetadata).
more_props_resp_schema(AddMetadata) ->
#{oneOf := Schema} = req_schema(),
Schema1 = [S#{properties => AddMetadata(Prop)}
|| S = #{properties := Prop} <- Schema],
#{'oneOf' => Schema1}.
@ -66,6 +119,10 @@ bridge_apis() ->
[list_all_bridges_api(), crud_bridges_apis(), operation_apis()].
list_all_bridges_api() ->
ReqSchema = more_props_resp_schema(fun(Prop) ->
Prop#{id => #{type => string, required => true}}
end),
RespSchema = resp_schema(),
Metadata = #{
get => #{
description => <<"List all created bridges">>,
@ -73,9 +130,18 @@ list_all_bridges_api() ->
<<"200">> => emqx_mgmt_util:array_schema(resp_schema(),
<<"A list of the bridges">>)
}
},
post => #{
description => <<"Create a new bridge">>,
'requestBody' => emqx_mgmt_util:schema(ReqSchema),
responses => #{
<<"201">> => emqx_mgmt_util:schema(RespSchema, <<"Bridge created">>),
<<"400">> => emqx_mgmt_util:error_schema(<<"Create bridge failed">>,
['UPDATE_FAILED'])
}
}
},
{"/bridges/", Metadata, list_bridges}.
{"/bridges/", Metadata, list_create_bridges_in_cluster}.
crud_bridges_apis() ->
ReqSchema = req_schema(),
@ -91,7 +157,7 @@ crud_bridges_apis() ->
}
},
put => #{
description => <<"Create or update a bridge">>,
description => <<"Update a bridge">>,
parameters => [param_path_id()],
'requestBody' => emqx_mgmt_util:schema(ReqSchema),
responses => #{
@ -109,7 +175,7 @@ crud_bridges_apis() ->
}
}
},
{"/bridges/:id", Metadata, crud_bridges_cluster}.
{"/bridges/:id", Metadata, crud_bridges_in_cluster}.
operation_apis() ->
Metadata = #{
@ -153,53 +219,73 @@ param_path_operation()->
example => restart
}.
list_bridges(get, _Params) ->
{200, lists:append([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
list_create_bridges_in_cluster(post, #{body := #{<<"id">> := Id} = Conf}) ->
crud_bridges_in_cluster(post, Id, maps:remove(<<"id">>, Conf));
list_create_bridges_in_cluster(get, _Params) ->
{200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
list_local_bridges(Node) when Node =:= node() ->
[format_resp(Data) || Data <- emqx_bridge:list()];
list_local_bridges(Node) ->
rpc_call(Node, list_local_bridges, [Node]).
crud_bridges_cluster(Method, Params) ->
Results = [crud_bridges(Node, Method, Params) || Node <- mria_mnesia:running_nodes()],
case lists:filter(fun({200}) -> false; ({200, _}) -> false; (_) -> true end, Results) of
crud_bridges_in_cluster(Method, #{bindings := #{id := Id}, body := Body}) ->
crud_bridges_in_cluster(Method, Id, Body).
crud_bridges_in_cluster(Method, Id, Body) ->
Results = [crud_local_bridges(Node, Method, Id, Body) || Node <- mria_mnesia:running_nodes()],
Filter = fun ({200}) -> false;
({Code, _}) when Code == 200; Code == 201 -> false;
(_) -> true
end,
case lists:filter(Filter, Results) of
[] ->
case Results of
[{200} | _] -> {200};
_ -> {200, [Res || {200, Res} <- Results]}
[{Code, _} | _] when Code == 200; Code == 201 ->
{Code, format_bridge_info([Bridge || {_, Bridge} <- Results])}
end;
Errors ->
hd(Errors)
end.
crud_bridges(Node, Method, Params) when Node =/= node() ->
rpc_call(Node, crud_bridges, [Node, Method, Params]);
crud_local_bridges(Node, Method, Id, Body) when Node =/= node() ->
rpc_call(Node, crud_local_bridges, [Node, Method, Id, Body]);
crud_bridges(_, get, #{bindings := #{id := Id}}) ->
crud_local_bridges(_, get, Id, _Body) ->
?TRY_PARSE_ID(Id, case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, Data} -> {200, format_resp(Data)};
{error, not_found} ->
{404, #{code => 102, message => <<"not_found: ", Id/binary>>}}
end);
crud_bridges(_, put, #{bindings := #{id := Id}, body := Conf}) ->
crud_local_bridges(_, post, Id, Conf) ->
?TRY_PARSE_ID(Id,
case emqx:update_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
#{rawconf_with_defaults => true}) of
{ok, #{raw_config := RawConf, post_config_update := #{emqx_bridge := Data}}} ->
{200, format_resp(#{id => Id, raw_config => RawConf, resource_data => Data})};
{ok, _} -> %% the bridge already exits
{ok, Data} = emqx_bridge:lookup(BridgeType, BridgeName),
{200, format_resp(Data)};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
{error, not_found} ->
case ensure_bridge(Id, BridgeType, BridgeName, Conf) of
{ok, Resp} -> {201, Resp};
{error, Error} -> {400, Error}
end
end);
crud_bridges(_, delete, #{bindings := #{id := Id}}) ->
crud_local_bridges(_, put, Id, Conf) ->
?TRY_PARSE_ID(Id,
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} ->
case ensure_bridge(Id, BridgeType, BridgeName, Conf) of
{ok, Resp} -> {200, Resp};
{error, Error} -> {400, Error}
end;
{error, not_found} ->
{404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}}
end);
crud_local_bridges(_, delete, Id, _Body) ->
?TRY_PARSE_ID(Id,
case emqx:remove_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName]) of
{ok, _} -> {200};
{ok, _} -> {204};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end).
@ -218,13 +304,68 @@ manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}})
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end).
ensure_bridge(Id, BridgeType, BridgeName, Conf) ->
case emqx:update_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
#{rawconf_with_defaults => true}) of
{ok, #{raw_config := RawConf, post_config_update := #{emqx_bridge := Data}}} ->
{ok, format_resp(#{id => Id, raw_config => RawConf, resource_data => Data})};
{error, Reason} ->
{error, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end.
zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
lists:foldl(fun(#{id := Id}, Acc) ->
Bridges = pick_bridges_by_id(Id, BridgesAllNodes),
[format_bridge_info(Bridges) | Acc]
end, [], BridgesFirstNode).
pick_bridges_by_id(Id, BridgesAllNodes) ->
lists:foldl(fun(BridgesOneNode, Acc) ->
[BridgeInfo] = [Bridge || Bridge = #{id := Id0} <- BridgesOneNode, Id0 == Id],
[BridgeInfo | Acc]
end, [], BridgesAllNodes).
format_bridge_info([FirstBridge | _] = Bridges) ->
Res = maps:remove(node, FirstBridge),
NodeStatus = collect_status(Bridges),
NodeMetrics = collect_metrics(Bridges),
Res#{ status => aggregate_status(NodeStatus)
, node_status => NodeStatus
, metrics => aggregate_metrics(NodeMetrics)
, node_metrics => NodeMetrics
}.
collect_status(Bridges) ->
[maps:with([node, status], B) || B <- Bridges].
aggregate_status(AllStatus) ->
AllConnected = lists:all(fun (#{status := connected}) -> true;
(_) -> false
end, AllStatus),
case AllConnected of
true -> connected;
false -> disconnected
end.
collect_metrics(Bridges) ->
[maps:with([node, metrics], B) || B <- Bridges].
aggregate_metrics(AllMetrics) ->
InitMetrics = ?METRICS(0,0,0,0,0),
lists:foldl(fun(#{metrics := ?metrics(Succ1, Failed1, Rate1, Rate5m1, RateMax1)},
?metrics(Succ0, Failed0, Rate0, Rate5m0, RateMax0)) ->
?METRICS(Succ1 + Succ0, Failed1 + Failed0,
Rate1 + Rate0, Rate5m1 + Rate5m0, RateMax1 + RateMax0)
end, InitMetrics, AllMetrics).
format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, status := Status}}) ->
IsConnected = fun(started) -> connected; (_) -> disconnected end,
RawConf#{
id => Id,
node => node(),
bridge_type => emqx_bridge:bridge_type(Mod),
status => IsConnected(Status)
status => IsConnected(Status),
metrics => ?METRICS(0,0,0,0,0)
}.
rpc_call(Node, Fun, Args) ->

View File

@ -21,6 +21,22 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"bridges: {}">>).
-define(TEST_ID, <<"http:test_bridge">>).
-define(URL(PORT, PATH), list_to_binary(
io_lib:format("http://localhost:~s/~s",
[integer_to_list(PORT), PATH]))).
-define(HTTP_BRIDGE(URL),
#{
<<"url">> => URL,
<<"from_local_topic">> => <<"emqx_http/#">>,
<<"method">> => <<"post">>,
<<"ssl">> => #{<<"enable">> => false},
<<"body">> => <<"${payload}">>,
<<"headers">> => #{
<<"content-type">> => <<"application/json">>
}
}).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -56,23 +72,6 @@ init_per_testcase(_, Config) ->
end_per_testcase(_, _Config) ->
ok.
-define(URL(PORT, PATH), list_to_binary(
io_lib:format("http://localhost:~s/~s",
[integer_to_list(PORT), PATH]))).
-define(HTTP_BRIDGE(URL),
#{
<<"url">> => URL,
<<"from_local_topic">> => <<"emqx_http/#">>,
<<"method">> => <<"post">>,
<<"ssl">> => #{<<"enable">> => false},
<<"body">> => <<"${payload}">>,
<<"headers">> => #{
<<"content-type">> => <<"application/json">>
}
}).
%%------------------------------------------------------------------------------
%% HTTP server for testing
%%------------------------------------------------------------------------------
@ -124,105 +123,137 @@ handle_fun_200_ok(Conn) ->
%% Testcases
%%------------------------------------------------------------------------------
t_crud_apis(_) ->
t_http_crud_apis(_) ->
Port = start_http_server(fun handle_fun_200_ok/1),
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a http bridge now
%% PUT /bridges/:id will create or update a bridge
%% then we add a http bridge, using PUT
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
{ok, 200, Bridge} = request(put, uri(["bridges", "http:test_bridge"]),
?HTTP_BRIDGE(URL1)),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}),
%ct:pal("---bridge: ~p", [Bridge]),
?assertMatch([ #{ <<"id">> := <<"http:test_bridge">>
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node">> := _
, <<"url">> := URL1
}], jsx:decode(Bridge)),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
, <<"node_metrics">> := [_|_]
, <<"url">> := URL1
}, jsx:decode(Bridge)),
%% create a again returns an error
{ok, 400, RetMsg} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"bridge already exists">>
}, jsx:decode(RetMsg)),
%% update the request-path of the bridge
URL2 = ?URL(Port, "path2"),
{ok, 200, Bridge2} = request(put, uri(["bridges", "http:test_bridge"]),
{ok, 200, Bridge2} = request(put, uri(["bridges", ?TEST_ID]),
?HTTP_BRIDGE(URL2)),
?assertMatch([ #{ <<"id">> := <<"http:test_bridge">>
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node">> := _
, <<"url">> := URL2
}], jsx:decode(Bridge2)),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
, <<"node_metrics">> := [_|_]
, <<"url">> := URL2
}, jsx:decode(Bridge2)),
%% list all bridges again, assert Bridge2 is in it
{ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []),
?assertMatch([ #{ <<"id">> := <<"http:test_bridge">>
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node">> := _
, <<"url">> := URL2
}], jsx:decode(Bridge2Str)),
?assertMatch([#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
, <<"node_metrics">> := [_|_]
, <<"url">> := URL2
}], jsx:decode(Bridge2Str)),
%% get the bridge by id
{ok, 200, Bridge3Str} = request(get, uri(["bridges", "http:test_bridge"]), []),
?assertMatch([#{ <<"id">> := <<"http:test_bridge">>
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node">> := _
, <<"url">> := URL2
}], jsx:decode(Bridge3Str)),
{ok, 200, Bridge3Str} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
, <<"node_metrics">> := [_|_]
, <<"url">> := URL2
}, jsx:decode(Bridge3Str)),
%% delete the bridge
{ok,200,<<>>} = request(delete, uri(["bridges", "http:test_bridge"]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", ?TEST_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% update a deleted bridge returns an error
{ok, 404, ErrMsg2} = request(put, uri(["bridges", ?TEST_ID]),
?HTTP_BRIDGE(URL2)),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"bridge not found">>
}, jsx:decode(ErrMsg2)),
ok.
t_start_stop_bridges(_) ->
Port = start_http_server(fun handle_fun_200_ok/1),
URL1 = ?URL(Port, "abc"),
{ok, 200, Bridge} = request(put, uri(["bridges", "http:test_bridge"]), ?HTTP_BRIDGE(URL1)),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}),
%ct:pal("the bridge ==== ~p", [Bridge]),
?assertMatch( [#{ <<"id">> := <<"http:test_bridge">>
, <<"bridge_type">> := <<"http">>
, <<"status">> := <<"connected">>
, <<"node">> := _
, <<"url">> := URL1
}], jsx:decode(Bridge)),
?assertMatch(
#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
, <<"node_metrics">> := [_|_]
, <<"url">> := URL1
}, jsx:decode(Bridge)),
%% stop it
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", "http:test_bridge", "operation", "stop"]),
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "stop"]),
<<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", "http:test_bridge"]), []),
?assertMatch([#{ <<"id">> := <<"http:test_bridge">>
, <<"status">> := <<"disconnected">>
}], jsx:decode(Bridge2)),
{ok, 200, Bridge2} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"status">> := <<"disconnected">>
}, jsx:decode(Bridge2)),
%% start again
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", "http:test_bridge", "operation", "start"]),
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "start"]),
<<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", "http:test_bridge"]), []),
?assertMatch([#{ <<"id">> := <<"http:test_bridge">>
, <<"status">> := <<"connected">>
}], jsx:decode(Bridge3)),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% restart an already started bridge
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", "http:test_bridge", "operation", "restart"]),
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "restart"]),
<<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", "http:test_bridge"]), []),
?assertMatch([#{ <<"id">> := <<"http:test_bridge">>
, <<"status">> := <<"connected">>
}], jsx:decode(Bridge3)),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% stop it again
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", "http:test_bridge", "operation", "stop"]),
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "stop"]),
<<"">>),
%% restart a stopped bridge
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", "http:test_bridge", "operation", "restart"]),
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "restart"]),
<<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", "http:test_bridge"]), []),
?assertMatch([#{ <<"id">> := <<"http:test_bridge">>
, <<"status">> := <<"connected">>
}], jsx:decode(Bridge4)).
{ok, 200, Bridge4} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge4)),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?TEST_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
%%--------------------------------------------------------------------
%% HTTP Request