diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index a82969dde..a5c24be9f 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -17,23 +17,32 @@
-behaviour(minirest_api).
--export([api_spec/0]).
+-include_lib("typerefl/include/types.hrl").
--export([ list_create_bridges_in_cluster/2
- , list_local_bridges/1
- , crud_bridges_in_cluster/2
- , manage_bridges/2
+-import(hoconsc, [mk/2, array/1, enum/1]).
+
+%% Swagger specs from hocon schema
+-export([api_spec/0, paths/0, schema/1, namespace/0]).
+
+%% API callbacks
+-export(['/bridges'/2, '/bridges/:id'/2,
+ '/nodes/:node/bridges/:id/operation/:operation'/2]).
+
+-export([ list_local_bridges/1
, lookup_from_local_node/2
]).
-define(TYPES, [mqtt, http]).
+
+-define(CONN_TYPES, [mqtt]).
+
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_bridge:parse_bridge_id(Id) of
{BridgeType, BridgeName} -> EXPR
catch
error:{invalid_bridge_id, Id0} ->
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
- ". Bridge ID must be of format 'bridge_type:name'">>}}
+ ". Bridge Ids must be of format {type}:{name}">>}}
end).
-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
@@ -53,184 +62,221 @@
rate_max := RATE_MAX
}).
-req_schema() ->
- Schema = [
- case maps:to_list(emqx:get_raw_config([bridges, T], #{})) of
- %% the bridge is not configured, so we have no method to get the schema
- [] -> #{};
- [{_K, Conf} | _] ->
- emqx_mgmt_api_configs:gen_schema(Conf)
- end
- || T <- ?TYPES],
- #{'oneOf' => Schema}.
-
-node_schema() ->
- #{type => string, example => "emqx@127.0.0.1"}.
-
-status_schema() ->
- #{type => string, enum => [connected, disconnected]}.
-
-metrics_schema() ->
- #{ type => object
- , properties => #{
- matched => #{type => integer, example => "0"},
- success => #{type => integer, example => "0"},
- failed => #{type => integer, example => "0"},
- rate => #{type => number, format => float, example => "0.0"},
- rate_last5m => #{type => number, format => float, example => "0.0"},
- rate_max => #{type => number, format => float, example => "0.0"}
- }
- }.
-
-per_node_schema(Key, Schema) ->
- #{
- type => array,
- items => #{
- type => object,
- properties => #{
- node => node_schema(),
- Key => Schema
- }
- }
- }.
-
-resp_schema() ->
- AddMetadata = fun(Prop) ->
- Prop#{status => status_schema(),
- node_status => per_node_schema(status, status_schema()),
- metrics => metrics_schema(),
- node_metrics => per_node_schema(metrics, metrics_schema()),
- id => #{type => string, example => "http:my_http_bridge"},
- bridge_type => #{type => string, enum => ?TYPES},
- node => node_schema()
- }
- end,
- more_props_resp_schema(AddMetadata).
-
-more_props_resp_schema(AddMetadata) ->
- #{'oneOf' := Schema} = req_schema(),
- Schema1 = [S#{properties => AddMetadata(Prop)}
- || S = #{properties := Prop} <- Schema],
- #{'oneOf' => Schema1}.
+namespace() -> "bridge".
api_spec() ->
- {bridge_apis(), []}.
+ emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
-bridge_apis() ->
- [list_all_bridges_api(), crud_bridges_apis(), operation_apis()].
+paths() -> ["/bridges", "/bridges/:id", "/nodes/:node/bridges/:id/operation/:operation"].
-list_all_bridges_api() ->
- ReqSchema = more_props_resp_schema(fun(Prop) ->
- Prop#{id => #{type => string, required => true}}
- end),
- RespSchema = resp_schema(),
- Metadata = #{
+error_schema(Code, Message) ->
+ [ {code, mk(string(), #{example => Code})}
+ , {message, mk(string(), #{example => Message})}
+ ].
+
+get_response_body_schema() ->
+ emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(),
+ bridge_info_examples(get)).
+
+param_path_node() ->
+ path_param(node, binary(), atom_to_binary(node(), utf8)).
+
+param_path_operation() ->
+ path_param(operation, enum([start, stop, restart]), <<"start">>).
+
+param_path_id() ->
+ path_param(id, binary(), <<"http:my_http_bridge">>).
+
+path_param(Name, Type, Example) ->
+ {Name, mk(Type,
+ #{ in => path
+ , required => true
+ , example => Example
+ })}.
+
+bridge_info_array_example(Method) ->
+ [Config || #{value := Config} <- maps:values(bridge_info_examples(Method))].
+
+bridge_info_examples(Method) ->
+ maps:merge(conn_bridge_examples(Method), #{
+ <<"http_bridge">> => #{
+ summary => <<"HTTP Bridge">>,
+ value => info_example(http, awesome, Method)
+ }
+ }).
+
+conn_bridge_examples(Method) ->
+ lists:foldl(fun(Type, Acc) ->
+ SType = atom_to_list(Type),
+ KeyIngress = bin(SType ++ "_ingress"),
+ KeyEgress = bin(SType ++ "_egress"),
+ maps:merge(Acc, #{
+ KeyIngress => #{
+ summary => bin(string:uppercase(SType) ++ " Ingress Bridge"),
+ value => info_example(Type, ingress, Method)
+ },
+ KeyEgress => #{
+ summary => bin(string:uppercase(SType) ++ " Egress Bridge"),
+ value => info_example(Type, egress, Method)
+ }
+ })
+ end, #{}, ?CONN_TYPES).
+
+info_example(Type, Direction, Method) ->
+ maps:merge(info_example_basic(Type, Direction),
+ method_example(Type, Direction, Method)).
+
+method_example(Type, Direction, get) ->
+ SType = atom_to_list(Type),
+ SDir = atom_to_list(Direction),
+ SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
+ #{
+ id => bin(SType ++ ":" ++ SName),
+ type => bin(SType),
+ name => bin(SName)
+ };
+method_example(Type, Direction, post) ->
+ SType = atom_to_list(Type),
+ SDir = atom_to_list(Direction),
+ SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
+ #{
+ type => bin(SType),
+ name => bin(SName)
+ };
+method_example(_Type, _Direction, put) ->
+ #{}.
+
+info_example_basic(http, _) ->
+ #{
+ url => <<"http://localhost:9901/messages/${topic}">>,
+ request_timeout => <<"30s">>,
+ connect_timeout => <<"30s">>,
+ max_retries => 3,
+ retry_interval => <<"10s">>,
+ pool_type => <<"random">>,
+ pool_size => 4,
+ enable_pipelining => true,
+ ssl => #{enable => false},
+ from_local_topic => <<"emqx_http/#">>,
+ method => post,
+ body => <<"${payload}">>
+ };
+info_example_basic(mqtt, ingress) ->
+ #{
+ connector => <<"mqtt:my_mqtt_connector">>,
+ direction => ingress,
+ from_remote_topic => <<"aws/#">>,
+ subscribe_qos => 1,
+ to_local_topic => <<"from_aws/${topic}">>,
+ payload => <<"${payload}">>,
+ qos => <<"${qos}">>,
+ retain => <<"${retain}">>
+ };
+info_example_basic(mqtt, egress) ->
+ #{
+ connector => <<"mqtt:my_mqtt_connector">>,
+ direction => egress,
+ from_local_topic => <<"emqx/#">>,
+ to_remote_topic => <<"from_emqx/${topic}">>,
+ payload => <<"${payload}">>,
+ qos => 1,
+ retain => false
+ }.
+
+schema("/bridges") ->
+ #{
+ operationId => '/bridges',
get => #{
+ tags => [<<"bridges">>],
+ summary => <<"List Bridges">>,
description => <<"List all created bridges">>,
responses => #{
- <<"200">> => emqx_mgmt_util:array_schema(resp_schema(),
- <<"A list of the bridges">>)
+ 200 => emqx_dashboard_swagger:schema_with_example(
+ array(emqx_bridge_schema:get_response()),
+ bridge_info_array_example(get))
}
},
post => #{
+ tags => [<<"bridges">>],
+ summary => <<"Create Bridge">>,
description => <<"Create a new bridge">>,
- 'requestBody' => emqx_mgmt_util:schema(ReqSchema),
+ requestBody => emqx_dashboard_swagger:schema_with_examples(
+ emqx_bridge_schema:post_request(),
+ bridge_info_examples(post)),
responses => #{
- <<"201">> => emqx_mgmt_util:schema(RespSchema, <<"Bridge created">>),
- <<"400">> => emqx_mgmt_util:error_schema(<<"Create bridge failed">>,
- ['UPDATE_FAILED'])
+ 201 => get_response_body_schema(),
+ 400 => error_schema('BAD_ARG', "Create bridge failed")
}
}
- },
- {"/bridges/", Metadata, list_create_bridges_in_cluster}.
+ };
-crud_bridges_apis() ->
- ReqSchema = req_schema(),
- RespSchema = resp_schema(),
- Metadata = #{
+schema("/bridges/:id") ->
+ #{
+ operationId => '/bridges/:id',
get => #{
+ tags => [<<"bridges">>],
+ summary => <<"Get Bridge">>,
description => <<"Get a bridge by Id">>,
parameters => [param_path_id()],
responses => #{
- <<"200">> => emqx_mgmt_util:array_schema(RespSchema,
- <<"The details of the bridge">>),
- <<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND'])
+ 200 => get_response_body_schema(),
+ 404 => error_schema('NOT_FOUND', "Bridge not found")
}
},
put => #{
+ tags => [<<"bridges">>],
+ summary => <<"Update Bridge">>,
description => <<"Update a bridge">>,
parameters => [param_path_id()],
- 'requestBody' => emqx_mgmt_util:schema(ReqSchema),
+ requestBody => emqx_dashboard_swagger:schema_with_examples(
+ emqx_bridge_schema:put_request(),
+ bridge_info_examples(put)),
responses => #{
- <<"200">> => emqx_mgmt_util:array_schema(RespSchema, <<"Bridge updated">>),
- <<"400">> => emqx_mgmt_util:error_schema(<<"Update bridge failed">>,
- ['UPDATE_FAILED'])
+ 200 => get_response_body_schema(),
+ 400 => error_schema('BAD_ARG', "Update bridge failed")
}
},
delete => #{
+ tags => [<<"bridges">>],
+ summary => <<"Delete Bridge">>,
description => <<"Delete a bridge">>,
parameters => [param_path_id()],
responses => #{
- <<"204">> => emqx_mgmt_util:schema(<<"Bridge deleted">>),
- <<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND'])
+ 204 => <<"Bridge deleted">>
}
}
- },
- {"/bridges/:id", Metadata, crud_bridges_in_cluster}.
+ };
-operation_apis() ->
- Metadata = #{
+schema("/nodes/:node/bridges/:id/operation/:operation") ->
+ #{
+ operationId => '/nodes/:node/bridges/:id/operation/:operation',
post => #{
+ tags => [<<"bridges">>],
+ summary => <<"Start/Stop/Restart Bridge">>,
description => <<"Start/Stop/Restart bridges on a specific node">>,
parameters => [
param_path_node(),
param_path_id(),
- param_path_operation()],
+ param_path_operation()
+ ],
responses => #{
- <<"500">> => emqx_mgmt_util:error_schema(<<"Operation Failed">>,
- ['INTERNAL_ERROR']),
- <<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}},
- {"/nodes/:node/bridges/:id/operation/:operation", Metadata, manage_bridges}.
-
-param_path_node() ->
- #{
- name => node,
- in => path,
- schema => #{type => string},
- required => true,
- example => node()
+ 500 => error_schema('INTERNAL_ERROR', "Operation Failed"),
+ 200 => <<"Operation success">>
+ }
+ }
}.
-param_path_id() ->
- #{
- name => id,
- in => path,
- schema => #{type => string},
- required => true
- }.
-
-param_path_operation()->
- #{
- name => operation,
- in => path,
- required => true,
- schema => #{
- type => string,
- enum => [start, stop, restart]},
- example => restart
- }.
-
-list_create_bridges_in_cluster(post, #{body := #{<<"id">> := Id} = Conf}) ->
- ?TRY_PARSE_ID(Id,
- case emqx_bridge:lookup(BridgeType, BridgeName) of
- {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
- {error, not_found} ->
- case ensure_bridge(BridgeType, BridgeName, maps:remove(<<"id">>, Conf)) of
- ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 201);
- {error, Error} -> {400, Error}
- end
- end);
-list_create_bridges_in_cluster(get, _Params) ->
+'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) ->
+ BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()),
+ case emqx_bridge:lookup(BridgeType, BridgeName) of
+ {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
+ {error, not_found} ->
+ case ensure_bridge_created(BridgeType, BridgeName, Conf) of
+ ok -> lookup_from_all_nodes(BridgeType, BridgeName, 201);
+ {error, Error} -> {400, Error}
+ end
+ end;
+'/bridges'(get, _Params) ->
{200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
list_local_bridges(Node) when Node =:= node() ->
@@ -238,22 +284,22 @@ list_local_bridges(Node) when Node =:= node() ->
list_local_bridges(Node) ->
rpc_call(Node, list_local_bridges, [Node]).
-crud_bridges_in_cluster(get, #{bindings := #{id := Id}}) ->
- ?TRY_PARSE_ID(Id, lookup_from_all_nodes(Id, BridgeType, BridgeName, 200));
+'/bridges/:id'(get, #{bindings := #{id := Id}}) ->
+ ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
-crud_bridges_in_cluster(put, #{bindings := #{id := Id}, body := Conf}) ->
+'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf}) ->
?TRY_PARSE_ID(Id,
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} ->
- case ensure_bridge(BridgeType, BridgeName, Conf) of
- ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 200);
+ case ensure_bridge_created(BridgeType, BridgeName, Conf) of
+ ok -> lookup_from_all_nodes(BridgeType, BridgeName, 200);
{error, Error} -> {400, Error}
end;
{error, not_found} ->
{404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}}
end);
-crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) ->
+'/bridges/:id'(delete, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id,
case emqx_conf:remove(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
#{override_to => cluster}) of
@@ -262,12 +308,12 @@ crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) ->
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
end).
-lookup_from_all_nodes(Id, BridgeType, BridgeName, SuccCode) ->
+lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of
{ok, [{ok, _} | _] = Results} ->
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
{ok, [{error, not_found} | _]} ->
- {404, error_msg('NOT_FOUND', <<"not_found: ", Id/binary>>)};
+ {404, error_msg('NOT_FOUND', <<"not_found">>)};
{error, ErrL} ->
{500, error_msg('UNKNOWN_ERROR', ErrL)}
end.
@@ -278,7 +324,8 @@ lookup_from_local_node(BridgeType, BridgeName) ->
Error -> Error
end.
-manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) ->
+'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings :=
+ #{node := Node, id := Id, operation := Op}}) ->
OperFun =
fun (<<"start">>) -> start;
(<<"stop">>) -> stop;
@@ -292,9 +339,10 @@ manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}})
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
end).
-ensure_bridge(BridgeType, BridgeName, Conf) ->
- case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
- #{override_to => cluster}) of
+ensure_bridge_created(BridgeType, BridgeName, Conf) ->
+ Conf1 = maps:without([<<"type">>, <<"name">>], Conf),
+ case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
+ Conf1, #{override_to => cluster}) of
{ok, _} -> ok;
{error, Reason} ->
{error, error_msg('BAD_ARG', Reason)}
@@ -351,7 +399,7 @@ format_resp(#{id := Id, raw_config := RawConf,
RawConf#{
id => Id,
node => node(),
- bridge_type => emqx_bridge:bridge_type(Mod),
+ type => emqx_bridge:bridge_type(Mod),
status => IsConnected(Status),
metrics => Metrics
}.
@@ -379,3 +427,10 @@ error_msg(Code, Msg) when is_binary(Msg) ->
#{code => Code, message => Msg};
error_msg(Code, Msg) ->
#{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.
+
+bin(S) when is_atom(S) ->
+ atom_to_binary(S, utf8);
+bin(S) when is_list(S) ->
+ list_to_binary(S);
+bin(S) when is_binary(S) ->
+ S.
diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
new file mode 100644
index 000000000..2bef474bd
--- /dev/null
+++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
@@ -0,0 +1,95 @@
+-module(emqx_bridge_http_schema).
+
+-include_lib("typerefl/include/types.hrl").
+
+-import(hoconsc, [mk/2, enum/1]).
+
+-export([roots/0, fields/1]).
+
+%%======================================================================================
+%% Hocon Schema Definitions
+roots() -> [].
+
+fields("bridge") ->
+ basic_config() ++
+ [ {url, mk(binary(),
+ #{ nullable => false
+ , desc =>"""
+The URL of the HTTP Bridge.
+Template with variables is allowed in the path, but variables cannot be used in the scheme, host,
+or port part.
+For example, http://localhost:9901/${topic}
is allowed, but
+ http://${host}:9901/message
or http://localhost:${port}/message
+is not allowed.
+"""
+ })}
+ , {from_local_topic, mk(binary(),
+ #{ desc =>"""
+The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
+match the from_local_topic will be forwarded.
+NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches
+from_local_topic will be forwarded.
+"""
+ })}
+ , {method, mk(method(),
+ #{ default => post
+ , desc =>"""
+The method of the HTTP request. All the available methods are: post, put, get, delete.
+Template with variables is allowed.
+"""
+ })}
+ , {headers, mk(map(),
+ #{ default => #{
+ <<"accept">> => <<"application/json">>,
+ <<"cache-control">> => <<"no-cache">>,
+ <<"connection">> => <<"keep-alive">>,
+ <<"content-type">> => <<"application/json">>,
+ <<"keep-alive">> => <<"timeout=5">>}
+ , desc =>"""
+The headers of the HTTP request.
+Template with variables is allowed.
+"""
+ })
+ }
+ , {body, mk(binary(),
+ #{ default => <<"${payload}">>
+ , desc =>"""
+The body of the HTTP request.
+Template with variables is allowed.
+"""
+ })}
+ , {request_timeout, mk(emqx_schema:duration_ms(),
+ #{ default => <<"30s">>
+ , desc =>"""
+How long will the HTTP request timeout.
+"""
+ })}
+ ];
+
+fields("post") ->
+ [ type_field()
+ , name_field()
+ ] ++ fields("bridge");
+
+fields("put") ->
+ fields("bridge");
+
+fields("get") ->
+ [ id_field()
+ ] ++ fields("post").
+
+basic_config() ->
+ proplists:delete(base_url, emqx_connector_http:fields(config)).
+
+%%======================================================================================
+id_field() ->
+ {id, mk(binary(), #{desc => "The Bridge Id", example => "http:my_http_bridge"})}.
+
+type_field() ->
+ {type, mk(http, #{desc => "The Bridge Type"})}.
+
+name_field() ->
+ {name, mk(binary(), #{desc => "The Bridge Name"})}.
+
+method() ->
+ enum([post, put, get, delete]).
diff --git a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl
new file mode 100644
index 000000000..d2cf6b1a8
--- /dev/null
+++ b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl
@@ -0,0 +1,62 @@
+-module(emqx_bridge_mqtt_schema).
+
+-include_lib("typerefl/include/types.hrl").
+
+-import(hoconsc, [mk/2]).
+
+-export([roots/0, fields/1]).
+
+%%======================================================================================
+%% Hocon Schema Definitions
+roots() -> [].
+
+fields("ingress") ->
+ [ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
+ , emqx_bridge_schema:connector_name()
+ ] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
+
+fields("egress") ->
+ [ direction(egress, emqx_connector_mqtt_schema:egress_desc())
+ , emqx_bridge_schema:connector_name()
+ ] ++ emqx_connector_mqtt_schema:fields("egress");
+
+fields("post_ingress") ->
+ [ type_field()
+ , name_field()
+ ] ++ fields("ingress");
+fields("post_egress") ->
+ [ type_field()
+ , name_field()
+ ] ++ fields("egress");
+
+fields("put_ingress") ->
+ fields("ingress");
+fields("put_egress") ->
+ fields("egress");
+
+fields("get_ingress") ->
+ [ id_field()
+ ] ++ fields("post_ingress");
+fields("get_egress") ->
+ [ id_field()
+ ] ++ fields("post_egress").
+
+%%======================================================================================
+direction(Dir, Desc) ->
+ {direction, mk(Dir,
+ #{ nullable => false
+ , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
"
+ ++ Desc
+ })}.
+
+id_field() ->
+ {id, mk(binary(), #{desc => "The Bridge Id", example => "mqtt:my_mqtt_bridge"})}.
+
+type_field() ->
+ {type, mk(mqtt, #{desc => "The Bridge Type"})}.
+
+name_field() ->
+ {name, mk(binary(),
+ #{ desc => "The Bridge Name"
+ , example => "some_bridge_name"
+ })}.
diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl
index 3a3151ef0..2038e6c04 100644
--- a/apps/emqx_bridge/src/emqx_bridge_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl
@@ -2,105 +2,27 @@
-include_lib("typerefl/include/types.hrl").
+-import(hoconsc, [mk/2, ref/2]).
+
-export([roots/0, fields/1]).
+-export([ get_response/0
+ , put_request/0
+ , post_request/0
+ ]).
+
+-export([ connector_name/0
+ ]).
+
%%======================================================================================
%% Hocon Schema Definitions
-roots() -> [bridges].
+-define(CONN_TYPES, [mqtt]).
-fields(bridges) ->
- [ {mqtt,
- sc(hoconsc:map(name, hoconsc:union([ ref("ingress_mqtt_bridge")
- , ref("egress_mqtt_bridge")
- ])),
- #{ desc => "MQTT bridges"
- })}
- , {http,
- sc(hoconsc:map(name, ref("http_bridge")),
- #{ desc => "HTTP bridges"
- })}
- ];
-
-fields("ingress_mqtt_bridge") ->
- [ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
- , connector_name()
- ] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
-
-fields("egress_mqtt_bridge") ->
- [ direction(egress, emqx_connector_mqtt_schema:egress_desc())
- , connector_name()
- ] ++ emqx_connector_mqtt_schema:fields("egress");
-
-fields("http_bridge") ->
- basic_config_http() ++
- [ {url,
- sc(binary(),
- #{ nullable => false
- , desc =>"""
-The URL of the HTTP Bridge.
-Template with variables is allowed in the path, but variables cannot be used in the scheme, host,
-or port part.
-For example, http://localhost:9901/${topic}
is allowed, but
- http://${host}:9901/message
or http://localhost:${port}/message
-is not allowed.
-"""
- })}
- , {from_local_topic,
- sc(binary(),
- #{ desc =>"""
-The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
-match the from_local_topic will be forwarded.
-NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches
-from_local_topic will be forwarded.
-"""
- })}
- , {method,
- sc(method(),
- #{ default => post
- , desc =>"""
-The method of the HTTP request. All the available methods are: post, put, get, delete.
-Template with variables is allowed.
-"""
- })}
- , {headers,
- sc(map(),
- #{ default => #{
- <<"accept">> => <<"application/json">>,
- <<"cache-control">> => <<"no-cache">>,
- <<"connection">> => <<"keep-alive">>,
- <<"content-type">> => <<"application/json">>,
- <<"keep-alive">> => <<"timeout=5">>}
- , desc =>"""
-The headers of the HTTP request.
-Template with variables is allowed.
-"""
- })
- }
- , {body,
- sc(binary(),
- #{ default => <<"${payload}">>
- , desc =>"""
-The body of the HTTP request.
-Template with variables is allowed.
-"""
- })}
- , {request_timeout,
- sc(emqx_schema:duration_ms(),
- #{ default => <<"30s">>
- , desc =>"""
-How long will the HTTP request timeout.
-"""
- })}
- ].
-
-direction(Dir, Desc) ->
- {direction,
- sc(Dir,
- #{ nullable => false
- , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
" ++
- Desc
- })}.
+%%======================================================================================
+%% For HTTP APIs
+get_response() ->
+ http_schema("get").
connector_name() ->
{connector,
@@ -108,17 +30,35 @@ connector_name() ->
#{ nullable => false
, desc =>"""
The connector name to be used for this bridge.
-Connectors are configured as 'connectors.type.name',
+Connectors are configured as 'connectors.{type}.{name}',
for example 'connectors.http.mybridge'.
"""
})}.
-basic_config_http() ->
- proplists:delete(base_url, emqx_connector_http:fields(config)).
+put_request() ->
+ http_schema("put").
-method() ->
- hoconsc:enum([post, put, get, delete]).
+post_request() ->
+ http_schema("post").
-sc(Type, Meta) -> hoconsc:mk(Type, Meta).
+http_schema(Method) ->
+ Schemas = lists:flatmap(fun(Type) ->
+ [ref(schema_mod(Type), Method ++ "_ingress"),
+ ref(schema_mod(Type), Method ++ "_egress")]
+ end, ?CONN_TYPES),
+ hoconsc:union([ref(emqx_bridge_http_schema, Method)
+ | Schemas]).
-ref(Field) -> hoconsc:ref(?MODULE, Field).
+%%======================================================================================
+%% For config files
+roots() -> [bridges].
+
+fields(bridges) ->
+ [{http, mk(hoconsc:map(name, ref(emqx_bridge_http_schema, "bridge")), #{})}]
+ ++ [{T, mk(hoconsc:map(name, hoconsc:union([
+ ref(schema_mod(T), "ingress"),
+ ref(schema_mod(T), "egress")
+ ])), #{})} || T <- ?CONN_TYPES].
+
+schema_mod(Type) ->
+ list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])).
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index bc865906f..1510f439e 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -62,7 +62,7 @@ post_request_body_schema() ->
connector_info(post_req), connector_info_examples()).
get_response_body_schema() ->
- emqx_dashboard_swagger:schema_with_example(
+ emqx_dashboard_swagger:schema_with_examples(
connector_info(), connector_info_examples()).
connector_info() ->
diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl
index 190456262..9d11d7ac0 100644
--- a/apps/emqx_connector/src/emqx_connector_mqtt.erl
+++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl
@@ -100,7 +100,7 @@ on_start(InstId, Conf) ->
BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{
name => InstanceId,
- clientid => clientid(maps:get(clientid, Conf, InstanceId)),
+ clientid => clientid(maps:get(clientid, Conf, InstId)),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)),
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
},
@@ -190,4 +190,4 @@ basic_config(#{
}.
clientid(Id) ->
- unicode:characters_to_binary([Id, ":", atom_to_list(node())], utf8).
+ iolist_to_binary([Id, ":", atom_to_list(node())]).