From 56d46c80ebf6c502fb47c2ef89f022e87f1f4186 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 4 Dec 2021 21:22:20 +0800 Subject: [PATCH] refactor(rule): generate swagger from hocon schema for /bridges --- apps/emqx_bridge/src/emqx_bridge_api.erl | 365 ++++++++++-------- .../src/emqx_bridge_http_schema.erl | 95 +++++ .../src/emqx_bridge_mqtt_schema.erl | 62 +++ apps/emqx_bridge/src/emqx_bridge_schema.erl | 140 ++----- .../emqx_connector/src/emqx_connector_api.erl | 2 +- .../src/emqx_connector_mqtt.erl | 4 +- 6 files changed, 410 insertions(+), 258 deletions(-) create mode 100644 apps/emqx_bridge/src/emqx_bridge_http_schema.erl create mode 100644 apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index a82969dde..a5c24be9f 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -17,23 +17,32 @@ -behaviour(minirest_api). --export([api_spec/0]). +-include_lib("typerefl/include/types.hrl"). --export([ list_create_bridges_in_cluster/2 - , list_local_bridges/1 - , crud_bridges_in_cluster/2 - , manage_bridges/2 +-import(hoconsc, [mk/2, array/1, enum/1]). + +%% Swagger specs from hocon schema +-export([api_spec/0, paths/0, schema/1, namespace/0]). + +%% API callbacks +-export(['/bridges'/2, '/bridges/:id'/2, + '/nodes/:node/bridges/:id/operation/:operation'/2]). + +-export([ list_local_bridges/1 , lookup_from_local_node/2 ]). -define(TYPES, [mqtt, http]). + +-define(CONN_TYPES, [mqtt]). + -define(TRY_PARSE_ID(ID, EXPR), try emqx_bridge:parse_bridge_id(Id) of {BridgeType, BridgeName} -> EXPR catch error:{invalid_bridge_id, Id0} -> {400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary, - ". Bridge ID must be of format 'bridge_type:name'">>}} + ". Bridge Ids must be of format {type}:{name}">>}} end). -define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), @@ -53,184 +62,221 @@ rate_max := RATE_MAX }). -req_schema() -> - Schema = [ - case maps:to_list(emqx:get_raw_config([bridges, T], #{})) of - %% the bridge is not configured, so we have no method to get the schema - [] -> #{}; - [{_K, Conf} | _] -> - emqx_mgmt_api_configs:gen_schema(Conf) - end - || T <- ?TYPES], - #{'oneOf' => Schema}. - -node_schema() -> - #{type => string, example => "emqx@127.0.0.1"}. - -status_schema() -> - #{type => string, enum => [connected, disconnected]}. - -metrics_schema() -> - #{ type => object - , properties => #{ - matched => #{type => integer, example => "0"}, - success => #{type => integer, example => "0"}, - failed => #{type => integer, example => "0"}, - rate => #{type => number, format => float, example => "0.0"}, - rate_last5m => #{type => number, format => float, example => "0.0"}, - rate_max => #{type => number, format => float, example => "0.0"} - } - }. - -per_node_schema(Key, Schema) -> - #{ - type => array, - items => #{ - type => object, - properties => #{ - node => node_schema(), - Key => Schema - } - } - }. - -resp_schema() -> - AddMetadata = fun(Prop) -> - Prop#{status => status_schema(), - node_status => per_node_schema(status, status_schema()), - metrics => metrics_schema(), - node_metrics => per_node_schema(metrics, metrics_schema()), - id => #{type => string, example => "http:my_http_bridge"}, - bridge_type => #{type => string, enum => ?TYPES}, - node => node_schema() - } - end, - more_props_resp_schema(AddMetadata). - -more_props_resp_schema(AddMetadata) -> - #{'oneOf' := Schema} = req_schema(), - Schema1 = [S#{properties => AddMetadata(Prop)} - || S = #{properties := Prop} <- Schema], - #{'oneOf' => Schema1}. +namespace() -> "bridge". api_spec() -> - {bridge_apis(), []}. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). -bridge_apis() -> - [list_all_bridges_api(), crud_bridges_apis(), operation_apis()]. +paths() -> ["/bridges", "/bridges/:id", "/nodes/:node/bridges/:id/operation/:operation"]. -list_all_bridges_api() -> - ReqSchema = more_props_resp_schema(fun(Prop) -> - Prop#{id => #{type => string, required => true}} - end), - RespSchema = resp_schema(), - Metadata = #{ +error_schema(Code, Message) -> + [ {code, mk(string(), #{example => Code})} + , {message, mk(string(), #{example => Message})} + ]. + +get_response_body_schema() -> + emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(), + bridge_info_examples(get)). + +param_path_node() -> + path_param(node, binary(), atom_to_binary(node(), utf8)). + +param_path_operation() -> + path_param(operation, enum([start, stop, restart]), <<"start">>). + +param_path_id() -> + path_param(id, binary(), <<"http:my_http_bridge">>). + +path_param(Name, Type, Example) -> + {Name, mk(Type, + #{ in => path + , required => true + , example => Example + })}. + +bridge_info_array_example(Method) -> + [Config || #{value := Config} <- maps:values(bridge_info_examples(Method))]. + +bridge_info_examples(Method) -> + maps:merge(conn_bridge_examples(Method), #{ + <<"http_bridge">> => #{ + summary => <<"HTTP Bridge">>, + value => info_example(http, awesome, Method) + } + }). + +conn_bridge_examples(Method) -> + lists:foldl(fun(Type, Acc) -> + SType = atom_to_list(Type), + KeyIngress = bin(SType ++ "_ingress"), + KeyEgress = bin(SType ++ "_egress"), + maps:merge(Acc, #{ + KeyIngress => #{ + summary => bin(string:uppercase(SType) ++ " Ingress Bridge"), + value => info_example(Type, ingress, Method) + }, + KeyEgress => #{ + summary => bin(string:uppercase(SType) ++ " Egress Bridge"), + value => info_example(Type, egress, Method) + } + }) + end, #{}, ?CONN_TYPES). + +info_example(Type, Direction, Method) -> + maps:merge(info_example_basic(Type, Direction), + method_example(Type, Direction, Method)). + +method_example(Type, Direction, get) -> + SType = atom_to_list(Type), + SDir = atom_to_list(Direction), + SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge", + #{ + id => bin(SType ++ ":" ++ SName), + type => bin(SType), + name => bin(SName) + }; +method_example(Type, Direction, post) -> + SType = atom_to_list(Type), + SDir = atom_to_list(Direction), + SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge", + #{ + type => bin(SType), + name => bin(SName) + }; +method_example(_Type, _Direction, put) -> + #{}. + +info_example_basic(http, _) -> + #{ + url => <<"http://localhost:9901/messages/${topic}">>, + request_timeout => <<"30s">>, + connect_timeout => <<"30s">>, + max_retries => 3, + retry_interval => <<"10s">>, + pool_type => <<"random">>, + pool_size => 4, + enable_pipelining => true, + ssl => #{enable => false}, + from_local_topic => <<"emqx_http/#">>, + method => post, + body => <<"${payload}">> + }; +info_example_basic(mqtt, ingress) -> + #{ + connector => <<"mqtt:my_mqtt_connector">>, + direction => ingress, + from_remote_topic => <<"aws/#">>, + subscribe_qos => 1, + to_local_topic => <<"from_aws/${topic}">>, + payload => <<"${payload}">>, + qos => <<"${qos}">>, + retain => <<"${retain}">> + }; +info_example_basic(mqtt, egress) -> + #{ + connector => <<"mqtt:my_mqtt_connector">>, + direction => egress, + from_local_topic => <<"emqx/#">>, + to_remote_topic => <<"from_emqx/${topic}">>, + payload => <<"${payload}">>, + qos => 1, + retain => false + }. + +schema("/bridges") -> + #{ + operationId => '/bridges', get => #{ + tags => [<<"bridges">>], + summary => <<"List Bridges">>, description => <<"List all created bridges">>, responses => #{ - <<"200">> => emqx_mgmt_util:array_schema(resp_schema(), - <<"A list of the bridges">>) + 200 => emqx_dashboard_swagger:schema_with_example( + array(emqx_bridge_schema:get_response()), + bridge_info_array_example(get)) } }, post => #{ + tags => [<<"bridges">>], + summary => <<"Create Bridge">>, description => <<"Create a new bridge">>, - 'requestBody' => emqx_mgmt_util:schema(ReqSchema), + requestBody => emqx_dashboard_swagger:schema_with_examples( + emqx_bridge_schema:post_request(), + bridge_info_examples(post)), responses => #{ - <<"201">> => emqx_mgmt_util:schema(RespSchema, <<"Bridge created">>), - <<"400">> => emqx_mgmt_util:error_schema(<<"Create bridge failed">>, - ['UPDATE_FAILED']) + 201 => get_response_body_schema(), + 400 => error_schema('BAD_ARG', "Create bridge failed") } } - }, - {"/bridges/", Metadata, list_create_bridges_in_cluster}. + }; -crud_bridges_apis() -> - ReqSchema = req_schema(), - RespSchema = resp_schema(), - Metadata = #{ +schema("/bridges/:id") -> + #{ + operationId => '/bridges/:id', get => #{ + tags => [<<"bridges">>], + summary => <<"Get Bridge">>, description => <<"Get a bridge by Id">>, parameters => [param_path_id()], responses => #{ - <<"200">> => emqx_mgmt_util:array_schema(RespSchema, - <<"The details of the bridge">>), - <<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND']) + 200 => get_response_body_schema(), + 404 => error_schema('NOT_FOUND', "Bridge not found") } }, put => #{ + tags => [<<"bridges">>], + summary => <<"Update Bridge">>, description => <<"Update a bridge">>, parameters => [param_path_id()], - 'requestBody' => emqx_mgmt_util:schema(ReqSchema), + requestBody => emqx_dashboard_swagger:schema_with_examples( + emqx_bridge_schema:put_request(), + bridge_info_examples(put)), responses => #{ - <<"200">> => emqx_mgmt_util:array_schema(RespSchema, <<"Bridge updated">>), - <<"400">> => emqx_mgmt_util:error_schema(<<"Update bridge failed">>, - ['UPDATE_FAILED']) + 200 => get_response_body_schema(), + 400 => error_schema('BAD_ARG', "Update bridge failed") } }, delete => #{ + tags => [<<"bridges">>], + summary => <<"Delete Bridge">>, description => <<"Delete a bridge">>, parameters => [param_path_id()], responses => #{ - <<"204">> => emqx_mgmt_util:schema(<<"Bridge deleted">>), - <<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND']) + 204 => <<"Bridge deleted">> } } - }, - {"/bridges/:id", Metadata, crud_bridges_in_cluster}. + }; -operation_apis() -> - Metadata = #{ +schema("/nodes/:node/bridges/:id/operation/:operation") -> + #{ + operationId => '/nodes/:node/bridges/:id/operation/:operation', post => #{ + tags => [<<"bridges">>], + summary => <<"Start/Stop/Restart Bridge">>, description => <<"Start/Stop/Restart bridges on a specific node">>, parameters => [ param_path_node(), param_path_id(), - param_path_operation()], + param_path_operation() + ], responses => #{ - <<"500">> => emqx_mgmt_util:error_schema(<<"Operation Failed">>, - ['INTERNAL_ERROR']), - <<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}}, - {"/nodes/:node/bridges/:id/operation/:operation", Metadata, manage_bridges}. - -param_path_node() -> - #{ - name => node, - in => path, - schema => #{type => string}, - required => true, - example => node() + 500 => error_schema('INTERNAL_ERROR', "Operation Failed"), + 200 => <<"Operation success">> + } + } }. -param_path_id() -> - #{ - name => id, - in => path, - schema => #{type => string}, - required => true - }. - -param_path_operation()-> - #{ - name => operation, - in => path, - required => true, - schema => #{ - type => string, - enum => [start, stop, restart]}, - example => restart - }. - -list_create_bridges_in_cluster(post, #{body := #{<<"id">> := Id} = Conf}) -> - ?TRY_PARSE_ID(Id, - case emqx_bridge:lookup(BridgeType, BridgeName) of - {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}}; - {error, not_found} -> - case ensure_bridge(BridgeType, BridgeName, maps:remove(<<"id">>, Conf)) of - ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 201); - {error, Error} -> {400, Error} - end - end); -list_create_bridges_in_cluster(get, _Params) -> +'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) -> + BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()), + case emqx_bridge:lookup(BridgeType, BridgeName) of + {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}}; + {error, not_found} -> + case ensure_bridge_created(BridgeType, BridgeName, Conf) of + ok -> lookup_from_all_nodes(BridgeType, BridgeName, 201); + {error, Error} -> {400, Error} + end + end; +'/bridges'(get, _Params) -> {200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}. list_local_bridges(Node) when Node =:= node() -> @@ -238,22 +284,22 @@ list_local_bridges(Node) when Node =:= node() -> list_local_bridges(Node) -> rpc_call(Node, list_local_bridges, [Node]). -crud_bridges_in_cluster(get, #{bindings := #{id := Id}}) -> - ?TRY_PARSE_ID(Id, lookup_from_all_nodes(Id, BridgeType, BridgeName, 200)); +'/bridges/:id'(get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200)); -crud_bridges_in_cluster(put, #{bindings := #{id := Id}, body := Conf}) -> +'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf}) -> ?TRY_PARSE_ID(Id, case emqx_bridge:lookup(BridgeType, BridgeName) of {ok, _} -> - case ensure_bridge(BridgeType, BridgeName, Conf) of - ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 200); + case ensure_bridge_created(BridgeType, BridgeName, Conf) of + ok -> lookup_from_all_nodes(BridgeType, BridgeName, 200); {error, Error} -> {400, Error} end; {error, not_found} -> {404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}} end); -crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) -> +'/bridges/:id'(delete, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, case emqx_conf:remove(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], #{override_to => cluster}) of @@ -262,12 +308,12 @@ crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) -> {500, #{code => 102, message => emqx_resource_api:stringify(Reason)}} end). -lookup_from_all_nodes(Id, BridgeType, BridgeName, SuccCode) -> +lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of {ok, [{ok, _} | _] = Results} -> {SuccCode, format_bridge_info([R || {ok, R} <- Results])}; {ok, [{error, not_found} | _]} -> - {404, error_msg('NOT_FOUND', <<"not_found: ", Id/binary>>)}; + {404, error_msg('NOT_FOUND', <<"not_found">>)}; {error, ErrL} -> {500, error_msg('UNKNOWN_ERROR', ErrL)} end. @@ -278,7 +324,8 @@ lookup_from_local_node(BridgeType, BridgeName) -> Error -> Error end. -manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) -> +'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings := + #{node := Node, id := Id, operation := Op}}) -> OperFun = fun (<<"start">>) -> start; (<<"stop">>) -> stop; @@ -292,9 +339,10 @@ manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) {500, #{code => 102, message => emqx_resource_api:stringify(Reason)}} end). -ensure_bridge(BridgeType, BridgeName, Conf) -> - case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf, - #{override_to => cluster}) of +ensure_bridge_created(BridgeType, BridgeName, Conf) -> + Conf1 = maps:without([<<"type">>, <<"name">>], Conf), + case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], + Conf1, #{override_to => cluster}) of {ok, _} -> ok; {error, Reason} -> {error, error_msg('BAD_ARG', Reason)} @@ -351,7 +399,7 @@ format_resp(#{id := Id, raw_config := RawConf, RawConf#{ id => Id, node => node(), - bridge_type => emqx_bridge:bridge_type(Mod), + type => emqx_bridge:bridge_type(Mod), status => IsConnected(Status), metrics => Metrics }. @@ -379,3 +427,10 @@ error_msg(Code, Msg) when is_binary(Msg) -> #{code => Code, message => Msg}; error_msg(Code, Msg) -> #{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}. + +bin(S) when is_atom(S) -> + atom_to_binary(S, utf8); +bin(S) when is_list(S) -> + list_to_binary(S); +bin(S) when is_binary(S) -> + S. diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl new file mode 100644 index 000000000..2bef474bd --- /dev/null +++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl @@ -0,0 +1,95 @@ +-module(emqx_bridge_http_schema). + +-include_lib("typerefl/include/types.hrl"). + +-import(hoconsc, [mk/2, enum/1]). + +-export([roots/0, fields/1]). + +%%====================================================================================== +%% Hocon Schema Definitions +roots() -> []. + +fields("bridge") -> + basic_config() ++ + [ {url, mk(binary(), + #{ nullable => false + , desc =>""" +The URL of the HTTP Bridge.
+Template with variables is allowed in the path, but variables cannot be used in the scheme, host, +or port part.
+For example, http://localhost:9901/${topic} is allowed, but + http://${host}:9901/message or http://localhost:${port}/message +is not allowed. +""" + })} + , {from_local_topic, mk(binary(), + #{ desc =>""" +The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic +match the from_local_topic will be forwarded.
+NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches +from_local_topic will be forwarded. +""" + })} + , {method, mk(method(), + #{ default => post + , desc =>""" +The method of the HTTP request. All the available methods are: post, put, get, delete.
+Template with variables is allowed.
+""" + })} + , {headers, mk(map(), + #{ default => #{ + <<"accept">> => <<"application/json">>, + <<"cache-control">> => <<"no-cache">>, + <<"connection">> => <<"keep-alive">>, + <<"content-type">> => <<"application/json">>, + <<"keep-alive">> => <<"timeout=5">>} + , desc =>""" +The headers of the HTTP request.
+Template with variables is allowed. +""" + }) + } + , {body, mk(binary(), + #{ default => <<"${payload}">> + , desc =>""" +The body of the HTTP request.
+Template with variables is allowed. +""" + })} + , {request_timeout, mk(emqx_schema:duration_ms(), + #{ default => <<"30s">> + , desc =>""" +How long will the HTTP request timeout. +""" + })} + ]; + +fields("post") -> + [ type_field() + , name_field() + ] ++ fields("bridge"); + +fields("put") -> + fields("bridge"); + +fields("get") -> + [ id_field() + ] ++ fields("post"). + +basic_config() -> + proplists:delete(base_url, emqx_connector_http:fields(config)). + +%%====================================================================================== +id_field() -> + {id, mk(binary(), #{desc => "The Bridge Id", example => "http:my_http_bridge"})}. + +type_field() -> + {type, mk(http, #{desc => "The Bridge Type"})}. + +name_field() -> + {name, mk(binary(), #{desc => "The Bridge Name"})}. + +method() -> + enum([post, put, get, delete]). diff --git a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl new file mode 100644 index 000000000..d2cf6b1a8 --- /dev/null +++ b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl @@ -0,0 +1,62 @@ +-module(emqx_bridge_mqtt_schema). + +-include_lib("typerefl/include/types.hrl"). + +-import(hoconsc, [mk/2]). + +-export([roots/0, fields/1]). + +%%====================================================================================== +%% Hocon Schema Definitions +roots() -> []. + +fields("ingress") -> + [ direction(ingress, emqx_connector_mqtt_schema:ingress_desc()) + , emqx_bridge_schema:connector_name() + ] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress")); + +fields("egress") -> + [ direction(egress, emqx_connector_mqtt_schema:egress_desc()) + , emqx_bridge_schema:connector_name() + ] ++ emqx_connector_mqtt_schema:fields("egress"); + +fields("post_ingress") -> + [ type_field() + , name_field() + ] ++ fields("ingress"); +fields("post_egress") -> + [ type_field() + , name_field() + ] ++ fields("egress"); + +fields("put_ingress") -> + fields("ingress"); +fields("put_egress") -> + fields("egress"); + +fields("get_ingress") -> + [ id_field() + ] ++ fields("post_ingress"); +fields("get_egress") -> + [ id_field() + ] ++ fields("post_egress"). + +%%====================================================================================== +direction(Dir, Desc) -> + {direction, mk(Dir, + #{ nullable => false + , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
" + ++ Desc + })}. + +id_field() -> + {id, mk(binary(), #{desc => "The Bridge Id", example => "mqtt:my_mqtt_bridge"})}. + +type_field() -> + {type, mk(mqtt, #{desc => "The Bridge Type"})}. + +name_field() -> + {name, mk(binary(), + #{ desc => "The Bridge Name" + , example => "some_bridge_name" + })}. diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 3a3151ef0..2038e6c04 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -2,105 +2,27 @@ -include_lib("typerefl/include/types.hrl"). +-import(hoconsc, [mk/2, ref/2]). + -export([roots/0, fields/1]). +-export([ get_response/0 + , put_request/0 + , post_request/0 + ]). + +-export([ connector_name/0 + ]). + %%====================================================================================== %% Hocon Schema Definitions -roots() -> [bridges]. +-define(CONN_TYPES, [mqtt]). -fields(bridges) -> - [ {mqtt, - sc(hoconsc:map(name, hoconsc:union([ ref("ingress_mqtt_bridge") - , ref("egress_mqtt_bridge") - ])), - #{ desc => "MQTT bridges" - })} - , {http, - sc(hoconsc:map(name, ref("http_bridge")), - #{ desc => "HTTP bridges" - })} - ]; - -fields("ingress_mqtt_bridge") -> - [ direction(ingress, emqx_connector_mqtt_schema:ingress_desc()) - , connector_name() - ] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress")); - -fields("egress_mqtt_bridge") -> - [ direction(egress, emqx_connector_mqtt_schema:egress_desc()) - , connector_name() - ] ++ emqx_connector_mqtt_schema:fields("egress"); - -fields("http_bridge") -> - basic_config_http() ++ - [ {url, - sc(binary(), - #{ nullable => false - , desc =>""" -The URL of the HTTP Bridge.
-Template with variables is allowed in the path, but variables cannot be used in the scheme, host, -or port part.
-For example, http://localhost:9901/${topic} is allowed, but - http://${host}:9901/message or http://localhost:${port}/message -is not allowed. -""" - })} - , {from_local_topic, - sc(binary(), - #{ desc =>""" -The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic -match the from_local_topic will be forwarded.
-NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches -from_local_topic will be forwarded. -""" - })} - , {method, - sc(method(), - #{ default => post - , desc =>""" -The method of the HTTP request. All the available methods are: post, put, get, delete.
-Template with variables is allowed.
-""" - })} - , {headers, - sc(map(), - #{ default => #{ - <<"accept">> => <<"application/json">>, - <<"cache-control">> => <<"no-cache">>, - <<"connection">> => <<"keep-alive">>, - <<"content-type">> => <<"application/json">>, - <<"keep-alive">> => <<"timeout=5">>} - , desc =>""" -The headers of the HTTP request.
-Template with variables is allowed. -""" - }) - } - , {body, - sc(binary(), - #{ default => <<"${payload}">> - , desc =>""" -The body of the HTTP request.
-Template with variables is allowed. -""" - })} - , {request_timeout, - sc(emqx_schema:duration_ms(), - #{ default => <<"30s">> - , desc =>""" -How long will the HTTP request timeout. -""" - })} - ]. - -direction(Dir, Desc) -> - {direction, - sc(Dir, - #{ nullable => false - , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
" ++ - Desc - })}. +%%====================================================================================== +%% For HTTP APIs +get_response() -> + http_schema("get"). connector_name() -> {connector, @@ -108,17 +30,35 @@ connector_name() -> #{ nullable => false , desc =>""" The connector name to be used for this bridge. -Connectors are configured as 'connectors.type.name', +Connectors are configured as 'connectors.{type}.{name}', for example 'connectors.http.mybridge'. """ })}. -basic_config_http() -> - proplists:delete(base_url, emqx_connector_http:fields(config)). +put_request() -> + http_schema("put"). -method() -> - hoconsc:enum([post, put, get, delete]). +post_request() -> + http_schema("post"). -sc(Type, Meta) -> hoconsc:mk(Type, Meta). +http_schema(Method) -> + Schemas = lists:flatmap(fun(Type) -> + [ref(schema_mod(Type), Method ++ "_ingress"), + ref(schema_mod(Type), Method ++ "_egress")] + end, ?CONN_TYPES), + hoconsc:union([ref(emqx_bridge_http_schema, Method) + | Schemas]). -ref(Field) -> hoconsc:ref(?MODULE, Field). +%%====================================================================================== +%% For config files +roots() -> [bridges]. + +fields(bridges) -> + [{http, mk(hoconsc:map(name, ref(emqx_bridge_http_schema, "bridge")), #{})}] + ++ [{T, mk(hoconsc:map(name, hoconsc:union([ + ref(schema_mod(T), "ingress"), + ref(schema_mod(T), "egress") + ])), #{})} || T <- ?CONN_TYPES]. + +schema_mod(Type) -> + list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index bc865906f..1510f439e 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -62,7 +62,7 @@ post_request_body_schema() -> connector_info(post_req), connector_info_examples()). get_response_body_schema() -> - emqx_dashboard_swagger:schema_with_example( + emqx_dashboard_swagger:schema_with_examples( connector_info(), connector_info_examples()). connector_info() -> diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 190456262..9d11d7ac0 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -100,7 +100,7 @@ on_start(InstId, Conf) -> BasicConf = basic_config(Conf), BridgeConf = BasicConf#{ name => InstanceId, - clientid => clientid(maps:get(clientid, Conf, InstanceId)), + clientid => clientid(maps:get(clientid, Conf, InstId)), subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)), forwards => make_forward_confs(maps:get(egress, Conf, undefined)) }, @@ -190,4 +190,4 @@ basic_config(#{ }. clientid(Id) -> - unicode:characters_to_binary([Id, ":", atom_to_list(node())], utf8). + iolist_to_binary([Id, ":", atom_to_list(node())]).