refactor(rule): generate swagger from hocon schema for /bridges
This commit is contained in:
parent
affe69afd6
commit
56d46c80eb
|
@ -17,23 +17,32 @@
|
||||||
|
|
||||||
-behaviour(minirest_api).
|
-behaviour(minirest_api).
|
||||||
|
|
||||||
-export([api_spec/0]).
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
|
||||||
-export([ list_create_bridges_in_cluster/2
|
-import(hoconsc, [mk/2, array/1, enum/1]).
|
||||||
, list_local_bridges/1
|
|
||||||
, crud_bridges_in_cluster/2
|
%% Swagger specs from hocon schema
|
||||||
, manage_bridges/2
|
-export([api_spec/0, paths/0, schema/1, namespace/0]).
|
||||||
|
|
||||||
|
%% API callbacks
|
||||||
|
-export(['/bridges'/2, '/bridges/:id'/2,
|
||||||
|
'/nodes/:node/bridges/:id/operation/:operation'/2]).
|
||||||
|
|
||||||
|
-export([ list_local_bridges/1
|
||||||
, lookup_from_local_node/2
|
, lookup_from_local_node/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(TYPES, [mqtt, http]).
|
-define(TYPES, [mqtt, http]).
|
||||||
|
|
||||||
|
-define(CONN_TYPES, [mqtt]).
|
||||||
|
|
||||||
-define(TRY_PARSE_ID(ID, EXPR),
|
-define(TRY_PARSE_ID(ID, EXPR),
|
||||||
try emqx_bridge:parse_bridge_id(Id) of
|
try emqx_bridge:parse_bridge_id(Id) of
|
||||||
{BridgeType, BridgeName} -> EXPR
|
{BridgeType, BridgeName} -> EXPR
|
||||||
catch
|
catch
|
||||||
error:{invalid_bridge_id, Id0} ->
|
error:{invalid_bridge_id, Id0} ->
|
||||||
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
|
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
|
||||||
". Bridge ID must be of format 'bridge_type:name'">>}}
|
". Bridge Ids must be of format {type}:{name}">>}}
|
||||||
end).
|
end).
|
||||||
|
|
||||||
-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
|
-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
|
||||||
|
@ -53,184 +62,221 @@
|
||||||
rate_max := RATE_MAX
|
rate_max := RATE_MAX
|
||||||
}).
|
}).
|
||||||
|
|
||||||
req_schema() ->
|
namespace() -> "bridge".
|
||||||
Schema = [
|
|
||||||
case maps:to_list(emqx:get_raw_config([bridges, T], #{})) of
|
|
||||||
%% the bridge is not configured, so we have no method to get the schema
|
|
||||||
[] -> #{};
|
|
||||||
[{_K, Conf} | _] ->
|
|
||||||
emqx_mgmt_api_configs:gen_schema(Conf)
|
|
||||||
end
|
|
||||||
|| T <- ?TYPES],
|
|
||||||
#{'oneOf' => Schema}.
|
|
||||||
|
|
||||||
node_schema() ->
|
|
||||||
#{type => string, example => "emqx@127.0.0.1"}.
|
|
||||||
|
|
||||||
status_schema() ->
|
|
||||||
#{type => string, enum => [connected, disconnected]}.
|
|
||||||
|
|
||||||
metrics_schema() ->
|
|
||||||
#{ type => object
|
|
||||||
, properties => #{
|
|
||||||
matched => #{type => integer, example => "0"},
|
|
||||||
success => #{type => integer, example => "0"},
|
|
||||||
failed => #{type => integer, example => "0"},
|
|
||||||
rate => #{type => number, format => float, example => "0.0"},
|
|
||||||
rate_last5m => #{type => number, format => float, example => "0.0"},
|
|
||||||
rate_max => #{type => number, format => float, example => "0.0"}
|
|
||||||
}
|
|
||||||
}.
|
|
||||||
|
|
||||||
per_node_schema(Key, Schema) ->
|
|
||||||
#{
|
|
||||||
type => array,
|
|
||||||
items => #{
|
|
||||||
type => object,
|
|
||||||
properties => #{
|
|
||||||
node => node_schema(),
|
|
||||||
Key => Schema
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}.
|
|
||||||
|
|
||||||
resp_schema() ->
|
|
||||||
AddMetadata = fun(Prop) ->
|
|
||||||
Prop#{status => status_schema(),
|
|
||||||
node_status => per_node_schema(status, status_schema()),
|
|
||||||
metrics => metrics_schema(),
|
|
||||||
node_metrics => per_node_schema(metrics, metrics_schema()),
|
|
||||||
id => #{type => string, example => "http:my_http_bridge"},
|
|
||||||
bridge_type => #{type => string, enum => ?TYPES},
|
|
||||||
node => node_schema()
|
|
||||||
}
|
|
||||||
end,
|
|
||||||
more_props_resp_schema(AddMetadata).
|
|
||||||
|
|
||||||
more_props_resp_schema(AddMetadata) ->
|
|
||||||
#{'oneOf' := Schema} = req_schema(),
|
|
||||||
Schema1 = [S#{properties => AddMetadata(Prop)}
|
|
||||||
|| S = #{properties := Prop} <- Schema],
|
|
||||||
#{'oneOf' => Schema1}.
|
|
||||||
|
|
||||||
api_spec() ->
|
api_spec() ->
|
||||||
{bridge_apis(), []}.
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
|
||||||
|
|
||||||
bridge_apis() ->
|
paths() -> ["/bridges", "/bridges/:id", "/nodes/:node/bridges/:id/operation/:operation"].
|
||||||
[list_all_bridges_api(), crud_bridges_apis(), operation_apis()].
|
|
||||||
|
|
||||||
list_all_bridges_api() ->
|
error_schema(Code, Message) ->
|
||||||
ReqSchema = more_props_resp_schema(fun(Prop) ->
|
[ {code, mk(string(), #{example => Code})}
|
||||||
Prop#{id => #{type => string, required => true}}
|
, {message, mk(string(), #{example => Message})}
|
||||||
end),
|
].
|
||||||
RespSchema = resp_schema(),
|
|
||||||
Metadata = #{
|
get_response_body_schema() ->
|
||||||
|
emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(),
|
||||||
|
bridge_info_examples(get)).
|
||||||
|
|
||||||
|
param_path_node() ->
|
||||||
|
path_param(node, binary(), atom_to_binary(node(), utf8)).
|
||||||
|
|
||||||
|
param_path_operation() ->
|
||||||
|
path_param(operation, enum([start, stop, restart]), <<"start">>).
|
||||||
|
|
||||||
|
param_path_id() ->
|
||||||
|
path_param(id, binary(), <<"http:my_http_bridge">>).
|
||||||
|
|
||||||
|
path_param(Name, Type, Example) ->
|
||||||
|
{Name, mk(Type,
|
||||||
|
#{ in => path
|
||||||
|
, required => true
|
||||||
|
, example => Example
|
||||||
|
})}.
|
||||||
|
|
||||||
|
bridge_info_array_example(Method) ->
|
||||||
|
[Config || #{value := Config} <- maps:values(bridge_info_examples(Method))].
|
||||||
|
|
||||||
|
bridge_info_examples(Method) ->
|
||||||
|
maps:merge(conn_bridge_examples(Method), #{
|
||||||
|
<<"http_bridge">> => #{
|
||||||
|
summary => <<"HTTP Bridge">>,
|
||||||
|
value => info_example(http, awesome, Method)
|
||||||
|
}
|
||||||
|
}).
|
||||||
|
|
||||||
|
conn_bridge_examples(Method) ->
|
||||||
|
lists:foldl(fun(Type, Acc) ->
|
||||||
|
SType = atom_to_list(Type),
|
||||||
|
KeyIngress = bin(SType ++ "_ingress"),
|
||||||
|
KeyEgress = bin(SType ++ "_egress"),
|
||||||
|
maps:merge(Acc, #{
|
||||||
|
KeyIngress => #{
|
||||||
|
summary => bin(string:uppercase(SType) ++ " Ingress Bridge"),
|
||||||
|
value => info_example(Type, ingress, Method)
|
||||||
|
},
|
||||||
|
KeyEgress => #{
|
||||||
|
summary => bin(string:uppercase(SType) ++ " Egress Bridge"),
|
||||||
|
value => info_example(Type, egress, Method)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
end, #{}, ?CONN_TYPES).
|
||||||
|
|
||||||
|
info_example(Type, Direction, Method) ->
|
||||||
|
maps:merge(info_example_basic(Type, Direction),
|
||||||
|
method_example(Type, Direction, Method)).
|
||||||
|
|
||||||
|
method_example(Type, Direction, get) ->
|
||||||
|
SType = atom_to_list(Type),
|
||||||
|
SDir = atom_to_list(Direction),
|
||||||
|
SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
|
||||||
|
#{
|
||||||
|
id => bin(SType ++ ":" ++ SName),
|
||||||
|
type => bin(SType),
|
||||||
|
name => bin(SName)
|
||||||
|
};
|
||||||
|
method_example(Type, Direction, post) ->
|
||||||
|
SType = atom_to_list(Type),
|
||||||
|
SDir = atom_to_list(Direction),
|
||||||
|
SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
|
||||||
|
#{
|
||||||
|
type => bin(SType),
|
||||||
|
name => bin(SName)
|
||||||
|
};
|
||||||
|
method_example(_Type, _Direction, put) ->
|
||||||
|
#{}.
|
||||||
|
|
||||||
|
info_example_basic(http, _) ->
|
||||||
|
#{
|
||||||
|
url => <<"http://localhost:9901/messages/${topic}">>,
|
||||||
|
request_timeout => <<"30s">>,
|
||||||
|
connect_timeout => <<"30s">>,
|
||||||
|
max_retries => 3,
|
||||||
|
retry_interval => <<"10s">>,
|
||||||
|
pool_type => <<"random">>,
|
||||||
|
pool_size => 4,
|
||||||
|
enable_pipelining => true,
|
||||||
|
ssl => #{enable => false},
|
||||||
|
from_local_topic => <<"emqx_http/#">>,
|
||||||
|
method => post,
|
||||||
|
body => <<"${payload}">>
|
||||||
|
};
|
||||||
|
info_example_basic(mqtt, ingress) ->
|
||||||
|
#{
|
||||||
|
connector => <<"mqtt:my_mqtt_connector">>,
|
||||||
|
direction => ingress,
|
||||||
|
from_remote_topic => <<"aws/#">>,
|
||||||
|
subscribe_qos => 1,
|
||||||
|
to_local_topic => <<"from_aws/${topic}">>,
|
||||||
|
payload => <<"${payload}">>,
|
||||||
|
qos => <<"${qos}">>,
|
||||||
|
retain => <<"${retain}">>
|
||||||
|
};
|
||||||
|
info_example_basic(mqtt, egress) ->
|
||||||
|
#{
|
||||||
|
connector => <<"mqtt:my_mqtt_connector">>,
|
||||||
|
direction => egress,
|
||||||
|
from_local_topic => <<"emqx/#">>,
|
||||||
|
to_remote_topic => <<"from_emqx/${topic}">>,
|
||||||
|
payload => <<"${payload}">>,
|
||||||
|
qos => 1,
|
||||||
|
retain => false
|
||||||
|
}.
|
||||||
|
|
||||||
|
schema("/bridges") ->
|
||||||
|
#{
|
||||||
|
operationId => '/bridges',
|
||||||
get => #{
|
get => #{
|
||||||
|
tags => [<<"bridges">>],
|
||||||
|
summary => <<"List Bridges">>,
|
||||||
description => <<"List all created bridges">>,
|
description => <<"List all created bridges">>,
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"200">> => emqx_mgmt_util:array_schema(resp_schema(),
|
200 => emqx_dashboard_swagger:schema_with_example(
|
||||||
<<"A list of the bridges">>)
|
array(emqx_bridge_schema:get_response()),
|
||||||
|
bridge_info_array_example(get))
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
post => #{
|
post => #{
|
||||||
|
tags => [<<"bridges">>],
|
||||||
|
summary => <<"Create Bridge">>,
|
||||||
description => <<"Create a new bridge">>,
|
description => <<"Create a new bridge">>,
|
||||||
'requestBody' => emqx_mgmt_util:schema(ReqSchema),
|
requestBody => emqx_dashboard_swagger:schema_with_examples(
|
||||||
|
emqx_bridge_schema:post_request(),
|
||||||
|
bridge_info_examples(post)),
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"201">> => emqx_mgmt_util:schema(RespSchema, <<"Bridge created">>),
|
201 => get_response_body_schema(),
|
||||||
<<"400">> => emqx_mgmt_util:error_schema(<<"Create bridge failed">>,
|
400 => error_schema('BAD_ARG', "Create bridge failed")
|
||||||
['UPDATE_FAILED'])
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
};
|
||||||
{"/bridges/", Metadata, list_create_bridges_in_cluster}.
|
|
||||||
|
|
||||||
crud_bridges_apis() ->
|
schema("/bridges/:id") ->
|
||||||
ReqSchema = req_schema(),
|
#{
|
||||||
RespSchema = resp_schema(),
|
operationId => '/bridges/:id',
|
||||||
Metadata = #{
|
|
||||||
get => #{
|
get => #{
|
||||||
|
tags => [<<"bridges">>],
|
||||||
|
summary => <<"Get Bridge">>,
|
||||||
description => <<"Get a bridge by Id">>,
|
description => <<"Get a bridge by Id">>,
|
||||||
parameters => [param_path_id()],
|
parameters => [param_path_id()],
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"200">> => emqx_mgmt_util:array_schema(RespSchema,
|
200 => get_response_body_schema(),
|
||||||
<<"The details of the bridge">>),
|
404 => error_schema('NOT_FOUND', "Bridge not found")
|
||||||
<<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND'])
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
put => #{
|
put => #{
|
||||||
|
tags => [<<"bridges">>],
|
||||||
|
summary => <<"Update Bridge">>,
|
||||||
description => <<"Update a bridge">>,
|
description => <<"Update a bridge">>,
|
||||||
parameters => [param_path_id()],
|
parameters => [param_path_id()],
|
||||||
'requestBody' => emqx_mgmt_util:schema(ReqSchema),
|
requestBody => emqx_dashboard_swagger:schema_with_examples(
|
||||||
|
emqx_bridge_schema:put_request(),
|
||||||
|
bridge_info_examples(put)),
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"200">> => emqx_mgmt_util:array_schema(RespSchema, <<"Bridge updated">>),
|
200 => get_response_body_schema(),
|
||||||
<<"400">> => emqx_mgmt_util:error_schema(<<"Update bridge failed">>,
|
400 => error_schema('BAD_ARG', "Update bridge failed")
|
||||||
['UPDATE_FAILED'])
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
delete => #{
|
delete => #{
|
||||||
|
tags => [<<"bridges">>],
|
||||||
|
summary => <<"Delete Bridge">>,
|
||||||
description => <<"Delete a bridge">>,
|
description => <<"Delete a bridge">>,
|
||||||
parameters => [param_path_id()],
|
parameters => [param_path_id()],
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"204">> => emqx_mgmt_util:schema(<<"Bridge deleted">>),
|
204 => <<"Bridge deleted">>
|
||||||
<<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND'])
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
};
|
||||||
{"/bridges/:id", Metadata, crud_bridges_in_cluster}.
|
|
||||||
|
|
||||||
operation_apis() ->
|
schema("/nodes/:node/bridges/:id/operation/:operation") ->
|
||||||
Metadata = #{
|
#{
|
||||||
|
operationId => '/nodes/:node/bridges/:id/operation/:operation',
|
||||||
post => #{
|
post => #{
|
||||||
|
tags => [<<"bridges">>],
|
||||||
|
summary => <<"Start/Stop/Restart Bridge">>,
|
||||||
description => <<"Start/Stop/Restart bridges on a specific node">>,
|
description => <<"Start/Stop/Restart bridges on a specific node">>,
|
||||||
parameters => [
|
parameters => [
|
||||||
param_path_node(),
|
param_path_node(),
|
||||||
param_path_id(),
|
param_path_id(),
|
||||||
param_path_operation()],
|
param_path_operation()
|
||||||
|
],
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"500">> => emqx_mgmt_util:error_schema(<<"Operation Failed">>,
|
500 => error_schema('INTERNAL_ERROR', "Operation Failed"),
|
||||||
['INTERNAL_ERROR']),
|
200 => <<"Operation success">>
|
||||||
<<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}},
|
}
|
||||||
{"/nodes/:node/bridges/:id/operation/:operation", Metadata, manage_bridges}.
|
}
|
||||||
|
|
||||||
param_path_node() ->
|
|
||||||
#{
|
|
||||||
name => node,
|
|
||||||
in => path,
|
|
||||||
schema => #{type => string},
|
|
||||||
required => true,
|
|
||||||
example => node()
|
|
||||||
}.
|
}.
|
||||||
|
|
||||||
param_path_id() ->
|
'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) ->
|
||||||
#{
|
BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()),
|
||||||
name => id,
|
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
||||||
in => path,
|
{ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
|
||||||
schema => #{type => string},
|
{error, not_found} ->
|
||||||
required => true
|
case ensure_bridge_created(BridgeType, BridgeName, Conf) of
|
||||||
}.
|
ok -> lookup_from_all_nodes(BridgeType, BridgeName, 201);
|
||||||
|
{error, Error} -> {400, Error}
|
||||||
param_path_operation()->
|
end
|
||||||
#{
|
end;
|
||||||
name => operation,
|
'/bridges'(get, _Params) ->
|
||||||
in => path,
|
|
||||||
required => true,
|
|
||||||
schema => #{
|
|
||||||
type => string,
|
|
||||||
enum => [start, stop, restart]},
|
|
||||||
example => restart
|
|
||||||
}.
|
|
||||||
|
|
||||||
list_create_bridges_in_cluster(post, #{body := #{<<"id">> := Id} = Conf}) ->
|
|
||||||
?TRY_PARSE_ID(Id,
|
|
||||||
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
|
||||||
{ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
|
|
||||||
{error, not_found} ->
|
|
||||||
case ensure_bridge(BridgeType, BridgeName, maps:remove(<<"id">>, Conf)) of
|
|
||||||
ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 201);
|
|
||||||
{error, Error} -> {400, Error}
|
|
||||||
end
|
|
||||||
end);
|
|
||||||
list_create_bridges_in_cluster(get, _Params) ->
|
|
||||||
{200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
|
{200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
|
||||||
|
|
||||||
list_local_bridges(Node) when Node =:= node() ->
|
list_local_bridges(Node) when Node =:= node() ->
|
||||||
|
@ -238,22 +284,22 @@ list_local_bridges(Node) when Node =:= node() ->
|
||||||
list_local_bridges(Node) ->
|
list_local_bridges(Node) ->
|
||||||
rpc_call(Node, list_local_bridges, [Node]).
|
rpc_call(Node, list_local_bridges, [Node]).
|
||||||
|
|
||||||
crud_bridges_in_cluster(get, #{bindings := #{id := Id}}) ->
|
'/bridges/:id'(get, #{bindings := #{id := Id}}) ->
|
||||||
?TRY_PARSE_ID(Id, lookup_from_all_nodes(Id, BridgeType, BridgeName, 200));
|
?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
|
||||||
|
|
||||||
crud_bridges_in_cluster(put, #{bindings := #{id := Id}, body := Conf}) ->
|
'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf}) ->
|
||||||
?TRY_PARSE_ID(Id,
|
?TRY_PARSE_ID(Id,
|
||||||
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
case ensure_bridge(BridgeType, BridgeName, Conf) of
|
case ensure_bridge_created(BridgeType, BridgeName, Conf) of
|
||||||
ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 200);
|
ok -> lookup_from_all_nodes(BridgeType, BridgeName, 200);
|
||||||
{error, Error} -> {400, Error}
|
{error, Error} -> {400, Error}
|
||||||
end;
|
end;
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
{404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}}
|
{404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}}
|
||||||
end);
|
end);
|
||||||
|
|
||||||
crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) ->
|
'/bridges/:id'(delete, #{bindings := #{id := Id}}) ->
|
||||||
?TRY_PARSE_ID(Id,
|
?TRY_PARSE_ID(Id,
|
||||||
case emqx_conf:remove(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
|
case emqx_conf:remove(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
|
||||||
#{override_to => cluster}) of
|
#{override_to => cluster}) of
|
||||||
|
@ -262,12 +308,12 @@ crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) ->
|
||||||
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
|
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
|
||||||
end).
|
end).
|
||||||
|
|
||||||
lookup_from_all_nodes(Id, BridgeType, BridgeName, SuccCode) ->
|
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
||||||
case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of
|
case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of
|
||||||
{ok, [{ok, _} | _] = Results} ->
|
{ok, [{ok, _} | _] = Results} ->
|
||||||
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
|
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
|
||||||
{ok, [{error, not_found} | _]} ->
|
{ok, [{error, not_found} | _]} ->
|
||||||
{404, error_msg('NOT_FOUND', <<"not_found: ", Id/binary>>)};
|
{404, error_msg('NOT_FOUND', <<"not_found">>)};
|
||||||
{error, ErrL} ->
|
{error, ErrL} ->
|
||||||
{500, error_msg('UNKNOWN_ERROR', ErrL)}
|
{500, error_msg('UNKNOWN_ERROR', ErrL)}
|
||||||
end.
|
end.
|
||||||
|
@ -278,7 +324,8 @@ lookup_from_local_node(BridgeType, BridgeName) ->
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) ->
|
'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings :=
|
||||||
|
#{node := Node, id := Id, operation := Op}}) ->
|
||||||
OperFun =
|
OperFun =
|
||||||
fun (<<"start">>) -> start;
|
fun (<<"start">>) -> start;
|
||||||
(<<"stop">>) -> stop;
|
(<<"stop">>) -> stop;
|
||||||
|
@ -292,9 +339,10 @@ manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}})
|
||||||
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
|
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
|
||||||
end).
|
end).
|
||||||
|
|
||||||
ensure_bridge(BridgeType, BridgeName, Conf) ->
|
ensure_bridge_created(BridgeType, BridgeName, Conf) ->
|
||||||
case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
|
Conf1 = maps:without([<<"type">>, <<"name">>], Conf),
|
||||||
#{override_to => cluster}) of
|
case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
|
||||||
|
Conf1, #{override_to => cluster}) of
|
||||||
{ok, _} -> ok;
|
{ok, _} -> ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, error_msg('BAD_ARG', Reason)}
|
{error, error_msg('BAD_ARG', Reason)}
|
||||||
|
@ -351,7 +399,7 @@ format_resp(#{id := Id, raw_config := RawConf,
|
||||||
RawConf#{
|
RawConf#{
|
||||||
id => Id,
|
id => Id,
|
||||||
node => node(),
|
node => node(),
|
||||||
bridge_type => emqx_bridge:bridge_type(Mod),
|
type => emqx_bridge:bridge_type(Mod),
|
||||||
status => IsConnected(Status),
|
status => IsConnected(Status),
|
||||||
metrics => Metrics
|
metrics => Metrics
|
||||||
}.
|
}.
|
||||||
|
@ -379,3 +427,10 @@ error_msg(Code, Msg) when is_binary(Msg) ->
|
||||||
#{code => Code, message => Msg};
|
#{code => Code, message => Msg};
|
||||||
error_msg(Code, Msg) ->
|
error_msg(Code, Msg) ->
|
||||||
#{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.
|
#{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.
|
||||||
|
|
||||||
|
bin(S) when is_atom(S) ->
|
||||||
|
atom_to_binary(S, utf8);
|
||||||
|
bin(S) when is_list(S) ->
|
||||||
|
list_to_binary(S);
|
||||||
|
bin(S) when is_binary(S) ->
|
||||||
|
S.
|
||||||
|
|
|
@ -0,0 +1,95 @@
|
||||||
|
-module(emqx_bridge_http_schema).
|
||||||
|
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
|
||||||
|
-import(hoconsc, [mk/2, enum/1]).
|
||||||
|
|
||||||
|
-export([roots/0, fields/1]).
|
||||||
|
|
||||||
|
%%======================================================================================
|
||||||
|
%% Hocon Schema Definitions
|
||||||
|
roots() -> [].
|
||||||
|
|
||||||
|
fields("bridge") ->
|
||||||
|
basic_config() ++
|
||||||
|
[ {url, mk(binary(),
|
||||||
|
#{ nullable => false
|
||||||
|
, desc =>"""
|
||||||
|
The URL of the HTTP Bridge.<br>
|
||||||
|
Template with variables is allowed in the path, but variables cannot be used in the scheme, host,
|
||||||
|
or port part.<br>
|
||||||
|
For example, <code> http://localhost:9901/${topic} </code> is allowed, but
|
||||||
|
<code> http://${host}:9901/message </code> or <code> http://localhost:${port}/message </code>
|
||||||
|
is not allowed.
|
||||||
|
"""
|
||||||
|
})}
|
||||||
|
, {from_local_topic, mk(binary(),
|
||||||
|
#{ desc =>"""
|
||||||
|
The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
|
||||||
|
match the from_local_topic will be forwarded.<br>
|
||||||
|
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches
|
||||||
|
from_local_topic will be forwarded.
|
||||||
|
"""
|
||||||
|
})}
|
||||||
|
, {method, mk(method(),
|
||||||
|
#{ default => post
|
||||||
|
, desc =>"""
|
||||||
|
The method of the HTTP request. All the available methods are: post, put, get, delete.<br>
|
||||||
|
Template with variables is allowed.<br>
|
||||||
|
"""
|
||||||
|
})}
|
||||||
|
, {headers, mk(map(),
|
||||||
|
#{ default => #{
|
||||||
|
<<"accept">> => <<"application/json">>,
|
||||||
|
<<"cache-control">> => <<"no-cache">>,
|
||||||
|
<<"connection">> => <<"keep-alive">>,
|
||||||
|
<<"content-type">> => <<"application/json">>,
|
||||||
|
<<"keep-alive">> => <<"timeout=5">>}
|
||||||
|
, desc =>"""
|
||||||
|
The headers of the HTTP request.<br>
|
||||||
|
Template with variables is allowed.
|
||||||
|
"""
|
||||||
|
})
|
||||||
|
}
|
||||||
|
, {body, mk(binary(),
|
||||||
|
#{ default => <<"${payload}">>
|
||||||
|
, desc =>"""
|
||||||
|
The body of the HTTP request.<br>
|
||||||
|
Template with variables is allowed.
|
||||||
|
"""
|
||||||
|
})}
|
||||||
|
, {request_timeout, mk(emqx_schema:duration_ms(),
|
||||||
|
#{ default => <<"30s">>
|
||||||
|
, desc =>"""
|
||||||
|
How long will the HTTP request timeout.
|
||||||
|
"""
|
||||||
|
})}
|
||||||
|
];
|
||||||
|
|
||||||
|
fields("post") ->
|
||||||
|
[ type_field()
|
||||||
|
, name_field()
|
||||||
|
] ++ fields("bridge");
|
||||||
|
|
||||||
|
fields("put") ->
|
||||||
|
fields("bridge");
|
||||||
|
|
||||||
|
fields("get") ->
|
||||||
|
[ id_field()
|
||||||
|
] ++ fields("post").
|
||||||
|
|
||||||
|
basic_config() ->
|
||||||
|
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
||||||
|
|
||||||
|
%%======================================================================================
|
||||||
|
id_field() ->
|
||||||
|
{id, mk(binary(), #{desc => "The Bridge Id", example => "http:my_http_bridge"})}.
|
||||||
|
|
||||||
|
type_field() ->
|
||||||
|
{type, mk(http, #{desc => "The Bridge Type"})}.
|
||||||
|
|
||||||
|
name_field() ->
|
||||||
|
{name, mk(binary(), #{desc => "The Bridge Name"})}.
|
||||||
|
|
||||||
|
method() ->
|
||||||
|
enum([post, put, get, delete]).
|
|
@ -0,0 +1,62 @@
|
||||||
|
-module(emqx_bridge_mqtt_schema).
|
||||||
|
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
|
||||||
|
-import(hoconsc, [mk/2]).
|
||||||
|
|
||||||
|
-export([roots/0, fields/1]).
|
||||||
|
|
||||||
|
%%======================================================================================
|
||||||
|
%% Hocon Schema Definitions
|
||||||
|
roots() -> [].
|
||||||
|
|
||||||
|
fields("ingress") ->
|
||||||
|
[ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
|
||||||
|
, emqx_bridge_schema:connector_name()
|
||||||
|
] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
|
||||||
|
|
||||||
|
fields("egress") ->
|
||||||
|
[ direction(egress, emqx_connector_mqtt_schema:egress_desc())
|
||||||
|
, emqx_bridge_schema:connector_name()
|
||||||
|
] ++ emqx_connector_mqtt_schema:fields("egress");
|
||||||
|
|
||||||
|
fields("post_ingress") ->
|
||||||
|
[ type_field()
|
||||||
|
, name_field()
|
||||||
|
] ++ fields("ingress");
|
||||||
|
fields("post_egress") ->
|
||||||
|
[ type_field()
|
||||||
|
, name_field()
|
||||||
|
] ++ fields("egress");
|
||||||
|
|
||||||
|
fields("put_ingress") ->
|
||||||
|
fields("ingress");
|
||||||
|
fields("put_egress") ->
|
||||||
|
fields("egress");
|
||||||
|
|
||||||
|
fields("get_ingress") ->
|
||||||
|
[ id_field()
|
||||||
|
] ++ fields("post_ingress");
|
||||||
|
fields("get_egress") ->
|
||||||
|
[ id_field()
|
||||||
|
] ++ fields("post_egress").
|
||||||
|
|
||||||
|
%%======================================================================================
|
||||||
|
direction(Dir, Desc) ->
|
||||||
|
{direction, mk(Dir,
|
||||||
|
#{ nullable => false
|
||||||
|
, desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.<br>"
|
||||||
|
++ Desc
|
||||||
|
})}.
|
||||||
|
|
||||||
|
id_field() ->
|
||||||
|
{id, mk(binary(), #{desc => "The Bridge Id", example => "mqtt:my_mqtt_bridge"})}.
|
||||||
|
|
||||||
|
type_field() ->
|
||||||
|
{type, mk(mqtt, #{desc => "The Bridge Type"})}.
|
||||||
|
|
||||||
|
name_field() ->
|
||||||
|
{name, mk(binary(),
|
||||||
|
#{ desc => "The Bridge Name"
|
||||||
|
, example => "some_bridge_name"
|
||||||
|
})}.
|
|
@ -2,105 +2,27 @@
|
||||||
|
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
|
||||||
|
-import(hoconsc, [mk/2, ref/2]).
|
||||||
|
|
||||||
-export([roots/0, fields/1]).
|
-export([roots/0, fields/1]).
|
||||||
|
|
||||||
|
-export([ get_response/0
|
||||||
|
, put_request/0
|
||||||
|
, post_request/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ connector_name/0
|
||||||
|
]).
|
||||||
|
|
||||||
%%======================================================================================
|
%%======================================================================================
|
||||||
%% Hocon Schema Definitions
|
%% Hocon Schema Definitions
|
||||||
|
|
||||||
roots() -> [bridges].
|
-define(CONN_TYPES, [mqtt]).
|
||||||
|
|
||||||
fields(bridges) ->
|
%%======================================================================================
|
||||||
[ {mqtt,
|
%% For HTTP APIs
|
||||||
sc(hoconsc:map(name, hoconsc:union([ ref("ingress_mqtt_bridge")
|
get_response() ->
|
||||||
, ref("egress_mqtt_bridge")
|
http_schema("get").
|
||||||
])),
|
|
||||||
#{ desc => "MQTT bridges"
|
|
||||||
})}
|
|
||||||
, {http,
|
|
||||||
sc(hoconsc:map(name, ref("http_bridge")),
|
|
||||||
#{ desc => "HTTP bridges"
|
|
||||||
})}
|
|
||||||
];
|
|
||||||
|
|
||||||
fields("ingress_mqtt_bridge") ->
|
|
||||||
[ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
|
|
||||||
, connector_name()
|
|
||||||
] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
|
|
||||||
|
|
||||||
fields("egress_mqtt_bridge") ->
|
|
||||||
[ direction(egress, emqx_connector_mqtt_schema:egress_desc())
|
|
||||||
, connector_name()
|
|
||||||
] ++ emqx_connector_mqtt_schema:fields("egress");
|
|
||||||
|
|
||||||
fields("http_bridge") ->
|
|
||||||
basic_config_http() ++
|
|
||||||
[ {url,
|
|
||||||
sc(binary(),
|
|
||||||
#{ nullable => false
|
|
||||||
, desc =>"""
|
|
||||||
The URL of the HTTP Bridge.<br>
|
|
||||||
Template with variables is allowed in the path, but variables cannot be used in the scheme, host,
|
|
||||||
or port part.<br>
|
|
||||||
For example, <code> http://localhost:9901/${topic} </code> is allowed, but
|
|
||||||
<code> http://${host}:9901/message </code> or <code> http://localhost:${port}/message </code>
|
|
||||||
is not allowed.
|
|
||||||
"""
|
|
||||||
})}
|
|
||||||
, {from_local_topic,
|
|
||||||
sc(binary(),
|
|
||||||
#{ desc =>"""
|
|
||||||
The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
|
|
||||||
match the from_local_topic will be forwarded.<br>
|
|
||||||
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches
|
|
||||||
from_local_topic will be forwarded.
|
|
||||||
"""
|
|
||||||
})}
|
|
||||||
, {method,
|
|
||||||
sc(method(),
|
|
||||||
#{ default => post
|
|
||||||
, desc =>"""
|
|
||||||
The method of the HTTP request. All the available methods are: post, put, get, delete.<br>
|
|
||||||
Template with variables is allowed.<br>
|
|
||||||
"""
|
|
||||||
})}
|
|
||||||
, {headers,
|
|
||||||
sc(map(),
|
|
||||||
#{ default => #{
|
|
||||||
<<"accept">> => <<"application/json">>,
|
|
||||||
<<"cache-control">> => <<"no-cache">>,
|
|
||||||
<<"connection">> => <<"keep-alive">>,
|
|
||||||
<<"content-type">> => <<"application/json">>,
|
|
||||||
<<"keep-alive">> => <<"timeout=5">>}
|
|
||||||
, desc =>"""
|
|
||||||
The headers of the HTTP request.<br>
|
|
||||||
Template with variables is allowed.
|
|
||||||
"""
|
|
||||||
})
|
|
||||||
}
|
|
||||||
, {body,
|
|
||||||
sc(binary(),
|
|
||||||
#{ default => <<"${payload}">>
|
|
||||||
, desc =>"""
|
|
||||||
The body of the HTTP request.<br>
|
|
||||||
Template with variables is allowed.
|
|
||||||
"""
|
|
||||||
})}
|
|
||||||
, {request_timeout,
|
|
||||||
sc(emqx_schema:duration_ms(),
|
|
||||||
#{ default => <<"30s">>
|
|
||||||
, desc =>"""
|
|
||||||
How long will the HTTP request timeout.
|
|
||||||
"""
|
|
||||||
})}
|
|
||||||
].
|
|
||||||
|
|
||||||
direction(Dir, Desc) ->
|
|
||||||
{direction,
|
|
||||||
sc(Dir,
|
|
||||||
#{ nullable => false
|
|
||||||
, desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.<br>" ++
|
|
||||||
Desc
|
|
||||||
})}.
|
|
||||||
|
|
||||||
connector_name() ->
|
connector_name() ->
|
||||||
{connector,
|
{connector,
|
||||||
|
@ -108,17 +30,35 @@ connector_name() ->
|
||||||
#{ nullable => false
|
#{ nullable => false
|
||||||
, desc =>"""
|
, desc =>"""
|
||||||
The connector name to be used for this bridge.
|
The connector name to be used for this bridge.
|
||||||
Connectors are configured as 'connectors.type.name',
|
Connectors are configured as 'connectors.{type}.{name}',
|
||||||
for example 'connectors.http.mybridge'.
|
for example 'connectors.http.mybridge'.
|
||||||
"""
|
"""
|
||||||
})}.
|
})}.
|
||||||
|
|
||||||
basic_config_http() ->
|
put_request() ->
|
||||||
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
http_schema("put").
|
||||||
|
|
||||||
method() ->
|
post_request() ->
|
||||||
hoconsc:enum([post, put, get, delete]).
|
http_schema("post").
|
||||||
|
|
||||||
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
|
http_schema(Method) ->
|
||||||
|
Schemas = lists:flatmap(fun(Type) ->
|
||||||
|
[ref(schema_mod(Type), Method ++ "_ingress"),
|
||||||
|
ref(schema_mod(Type), Method ++ "_egress")]
|
||||||
|
end, ?CONN_TYPES),
|
||||||
|
hoconsc:union([ref(emqx_bridge_http_schema, Method)
|
||||||
|
| Schemas]).
|
||||||
|
|
||||||
ref(Field) -> hoconsc:ref(?MODULE, Field).
|
%%======================================================================================
|
||||||
|
%% For config files
|
||||||
|
roots() -> [bridges].
|
||||||
|
|
||||||
|
fields(bridges) ->
|
||||||
|
[{http, mk(hoconsc:map(name, ref(emqx_bridge_http_schema, "bridge")), #{})}]
|
||||||
|
++ [{T, mk(hoconsc:map(name, hoconsc:union([
|
||||||
|
ref(schema_mod(T), "ingress"),
|
||||||
|
ref(schema_mod(T), "egress")
|
||||||
|
])), #{})} || T <- ?CONN_TYPES].
|
||||||
|
|
||||||
|
schema_mod(Type) ->
|
||||||
|
list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])).
|
||||||
|
|
|
@ -62,7 +62,7 @@ post_request_body_schema() ->
|
||||||
connector_info(post_req), connector_info_examples()).
|
connector_info(post_req), connector_info_examples()).
|
||||||
|
|
||||||
get_response_body_schema() ->
|
get_response_body_schema() ->
|
||||||
emqx_dashboard_swagger:schema_with_example(
|
emqx_dashboard_swagger:schema_with_examples(
|
||||||
connector_info(), connector_info_examples()).
|
connector_info(), connector_info_examples()).
|
||||||
|
|
||||||
connector_info() ->
|
connector_info() ->
|
||||||
|
|
|
@ -100,7 +100,7 @@ on_start(InstId, Conf) ->
|
||||||
BasicConf = basic_config(Conf),
|
BasicConf = basic_config(Conf),
|
||||||
BridgeConf = BasicConf#{
|
BridgeConf = BasicConf#{
|
||||||
name => InstanceId,
|
name => InstanceId,
|
||||||
clientid => clientid(maps:get(clientid, Conf, InstanceId)),
|
clientid => clientid(maps:get(clientid, Conf, InstId)),
|
||||||
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)),
|
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)),
|
||||||
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
|
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
|
||||||
},
|
},
|
||||||
|
@ -190,4 +190,4 @@ basic_config(#{
|
||||||
}.
|
}.
|
||||||
|
|
||||||
clientid(Id) ->
|
clientid(Id) ->
|
||||||
unicode:characters_to_binary([Id, ":", atom_to_list(node())], utf8).
|
iolist_to_binary([Id, ":", atom_to_list(node())]).
|
||||||
|
|
Loading…
Reference in New Issue