Merge pull request #6342 from terry-xiaoyu/improve_rule_api_swagger

Improve HTTP APIs for /rules, /bridges and /connectors
This commit is contained in:
Shawn 2021-12-07 11:13:19 +08:00 committed by GitHub
commit 51f772aade
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 974 additions and 653 deletions

View File

@ -17,220 +17,266 @@
-behaviour(minirest_api).
-export([api_spec/0]).
-include_lib("typerefl/include/types.hrl").
-export([ list_create_bridges_in_cluster/2
, list_local_bridges/1
, crud_bridges_in_cluster/2
, manage_bridges/2
-import(hoconsc, [mk/2, array/1, enum/1]).
%% Swagger specs from hocon schema
-export([api_spec/0, paths/0, schema/1, namespace/0]).
%% API callbacks
-export(['/bridges'/2, '/bridges/:id'/2,
'/nodes/:node/bridges/:id/operation/:operation'/2]).
-export([ list_local_bridges/1
, lookup_from_local_node/2
]).
-define(TYPES, [mqtt, http]).
-define(CONN_TYPES, [mqtt]).
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_bridge:parse_bridge_id(Id) of
{BridgeType, BridgeName} -> EXPR
catch
error:{invalid_bridge_id, Id0} ->
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
". Bridge ID must be of format 'bridge_type:name'">>}}
". Bridge Ids must be of format {type}:{name}">>}}
end).
-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
#{ matched => MATCH,
success => SUCC,
failed => FAILED,
speed => RATE,
speed_last5m => RATE_5,
speed_max => RATE_MAX
rate => RATE,
rate_last5m => RATE_5,
rate_max => RATE_MAX
}).
-define(metrics(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
#{ matched := MATCH,
success := SUCC,
failed := FAILED,
speed := RATE,
speed_last5m := RATE_5,
speed_max := RATE_MAX
rate := RATE,
rate_last5m := RATE_5,
rate_max := RATE_MAX
}).
req_schema() ->
Schema = [
case maps:to_list(emqx:get_raw_config([bridges, T], #{})) of
%% the bridge is not configured, so we have no method to get the schema
[] -> #{};
[{_K, Conf} | _] ->
emqx_mgmt_api_configs:gen_schema(Conf)
end
|| T <- ?TYPES],
#{'oneOf' => Schema}.
node_schema() ->
#{type => string, example => "emqx@127.0.0.1"}.
status_schema() ->
#{type => string, enum => [connected, disconnected]}.
metrics_schema() ->
#{ type => object
, properties => #{
matched => #{type => integer, example => "0"},
success => #{type => integer, example => "0"},
failed => #{type => integer, example => "0"},
speed => #{type => number, format => float, example => "0.0"},
speed_last5m => #{type => number, format => float, example => "0.0"},
speed_max => #{type => number, format => float, example => "0.0"}
}
}.
per_node_schema(Key, Schema) ->
#{
type => array,
items => #{
type => object,
properties => #{
node => node_schema(),
Key => Schema
}
}
}.
resp_schema() ->
AddMetadata = fun(Prop) ->
Prop#{status => status_schema(),
node_status => per_node_schema(status, status_schema()),
metrics => metrics_schema(),
node_metrics => per_node_schema(metrics, metrics_schema()),
id => #{type => string, example => "http:my_http_bridge"},
bridge_type => #{type => string, enum => ?TYPES},
node => node_schema()
}
end,
more_props_resp_schema(AddMetadata).
more_props_resp_schema(AddMetadata) ->
#{'oneOf' := Schema} = req_schema(),
Schema1 = [S#{properties => AddMetadata(Prop)}
|| S = #{properties := Prop} <- Schema],
#{'oneOf' => Schema1}.
namespace() -> "bridge".
api_spec() ->
{bridge_apis(), []}.
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
bridge_apis() ->
[list_all_bridges_api(), crud_bridges_apis(), operation_apis()].
paths() -> ["/bridges", "/bridges/:id", "/nodes/:node/bridges/:id/operation/:operation"].
list_all_bridges_api() ->
ReqSchema = more_props_resp_schema(fun(Prop) ->
Prop#{id => #{type => string, required => true}}
end),
RespSchema = resp_schema(),
Metadata = #{
error_schema(Code, Message) ->
[ {code, mk(string(), #{example => Code})}
, {message, mk(string(), #{example => Message})}
].
get_response_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(),
bridge_info_examples(get)).
param_path_node() ->
path_param(node, binary(), atom_to_binary(node(), utf8)).
param_path_operation() ->
path_param(operation, enum([start, stop, restart]), <<"start">>).
param_path_id() ->
path_param(id, binary(), <<"http:my_http_bridge">>).
path_param(Name, Type, Example) ->
{Name, mk(Type,
#{ in => path
, required => true
, example => Example
})}.
bridge_info_array_example(Method) ->
[Config || #{value := Config} <- maps:values(bridge_info_examples(Method))].
bridge_info_examples(Method) ->
maps:merge(conn_bridge_examples(Method), #{
<<"http_bridge">> => #{
summary => <<"HTTP Bridge">>,
value => info_example(http, awesome, Method)
}
}).
conn_bridge_examples(Method) ->
lists:foldl(fun(Type, Acc) ->
SType = atom_to_list(Type),
KeyIngress = bin(SType ++ "_ingress"),
KeyEgress = bin(SType ++ "_egress"),
maps:merge(Acc, #{
KeyIngress => #{
summary => bin(string:uppercase(SType) ++ " Ingress Bridge"),
value => info_example(Type, ingress, Method)
},
KeyEgress => #{
summary => bin(string:uppercase(SType) ++ " Egress Bridge"),
value => info_example(Type, egress, Method)
}
})
end, #{}, ?CONN_TYPES).
info_example(Type, Direction, Method) ->
maps:merge(info_example_basic(Type, Direction),
method_example(Type, Direction, Method)).
method_example(Type, Direction, get) ->
SType = atom_to_list(Type),
SDir = atom_to_list(Direction),
SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
#{
id => bin(SType ++ ":" ++ SName),
type => bin(SType),
name => bin(SName)
};
method_example(Type, Direction, post) ->
SType = atom_to_list(Type),
SDir = atom_to_list(Direction),
SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
#{
type => bin(SType),
name => bin(SName)
};
method_example(_Type, _Direction, put) ->
#{}.
info_example_basic(http, _) ->
#{
url => <<"http://localhost:9901/messages/${topic}">>,
request_timeout => <<"30s">>,
connect_timeout => <<"30s">>,
max_retries => 3,
retry_interval => <<"10s">>,
pool_type => <<"random">>,
pool_size => 4,
enable_pipelining => true,
ssl => #{enable => false},
from_local_topic => <<"emqx_http/#">>,
method => post,
body => <<"${payload}">>
};
info_example_basic(mqtt, ingress) ->
#{
connector => <<"mqtt:my_mqtt_connector">>,
direction => ingress,
from_remote_topic => <<"aws/#">>,
subscribe_qos => 1,
to_local_topic => <<"from_aws/${topic}">>,
payload => <<"${payload}">>,
qos => <<"${qos}">>,
retain => <<"${retain}">>
};
info_example_basic(mqtt, egress) ->
#{
connector => <<"mqtt:my_mqtt_connector">>,
direction => egress,
from_local_topic => <<"emqx/#">>,
to_remote_topic => <<"from_emqx/${topic}">>,
payload => <<"${payload}">>,
qos => 1,
retain => false
}.
schema("/bridges") ->
#{
operationId => '/bridges',
get => #{
tags => [<<"bridges">>],
summary => <<"List Bridges">>,
description => <<"List all created bridges">>,
responses => #{
<<"200">> => emqx_mgmt_util:array_schema(resp_schema(),
<<"A list of the bridges">>)
200 => emqx_dashboard_swagger:schema_with_example(
array(emqx_bridge_schema:get_response()),
bridge_info_array_example(get))
}
},
post => #{
tags => [<<"bridges">>],
summary => <<"Create Bridge">>,
description => <<"Create a new bridge">>,
'requestBody' => emqx_mgmt_util:schema(ReqSchema),
requestBody => emqx_dashboard_swagger:schema_with_examples(
emqx_bridge_schema:post_request(),
bridge_info_examples(post)),
responses => #{
<<"201">> => emqx_mgmt_util:schema(RespSchema, <<"Bridge created">>),
<<"400">> => emqx_mgmt_util:error_schema(<<"Create bridge failed">>,
['UPDATE_FAILED'])
201 => get_response_body_schema(),
400 => error_schema('BAD_ARG', "Create bridge failed")
}
}
},
{"/bridges/", Metadata, list_create_bridges_in_cluster}.
};
crud_bridges_apis() ->
ReqSchema = req_schema(),
RespSchema = resp_schema(),
Metadata = #{
schema("/bridges/:id") ->
#{
operationId => '/bridges/:id',
get => #{
tags => [<<"bridges">>],
summary => <<"Get Bridge">>,
description => <<"Get a bridge by Id">>,
parameters => [param_path_id()],
responses => #{
<<"200">> => emqx_mgmt_util:array_schema(RespSchema,
<<"The details of the bridge">>),
<<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND'])
200 => get_response_body_schema(),
404 => error_schema('NOT_FOUND', "Bridge not found")
}
},
put => #{
tags => [<<"bridges">>],
summary => <<"Update Bridge">>,
description => <<"Update a bridge">>,
parameters => [param_path_id()],
'requestBody' => emqx_mgmt_util:schema(ReqSchema),
requestBody => emqx_dashboard_swagger:schema_with_examples(
emqx_bridge_schema:put_request(),
bridge_info_examples(put)),
responses => #{
<<"200">> => emqx_mgmt_util:array_schema(RespSchema, <<"Bridge updated">>),
<<"400">> => emqx_mgmt_util:error_schema(<<"Update bridge failed">>,
['UPDATE_FAILED'])
200 => get_response_body_schema(),
400 => error_schema('BAD_ARG', "Update bridge failed")
}
},
delete => #{
tags => [<<"bridges">>],
summary => <<"Delete Bridge">>,
description => <<"Delete a bridge">>,
parameters => [param_path_id()],
responses => #{
<<"204">> => emqx_mgmt_util:schema(<<"Bridge deleted">>),
<<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND'])
204 => <<"Bridge deleted">>
}
}
},
{"/bridges/:id", Metadata, crud_bridges_in_cluster}.
};
operation_apis() ->
Metadata = #{
schema("/nodes/:node/bridges/:id/operation/:operation") ->
#{
operationId => '/nodes/:node/bridges/:id/operation/:operation',
post => #{
tags => [<<"bridges">>],
summary => <<"Start/Stop/Restart Bridge">>,
description => <<"Start/Stop/Restart bridges on a specific node">>,
parameters => [
param_path_node(),
param_path_id(),
param_path_operation()],
param_path_operation()
],
responses => #{
<<"500">> => emqx_mgmt_util:error_schema(<<"Operation Failed">>,
['INTERNAL_ERROR']),
<<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}},
{"/nodes/:node/bridges/:id/operation/:operation", Metadata, manage_bridges}.
param_path_node() ->
#{
name => node,
in => path,
schema => #{type => string},
required => true,
example => node()
500 => error_schema('INTERNAL_ERROR', "Operation Failed"),
200 => <<"Operation success">>
}
}
}.
param_path_id() ->
#{
name => id,
in => path,
schema => #{type => string},
required => true
}.
param_path_operation()->
#{
name => operation,
in => path,
required => true,
schema => #{
type => string,
enum => [start, stop, restart]},
example => restart
}.
list_create_bridges_in_cluster(post, #{body := #{<<"id">> := Id} = Conf}) ->
?TRY_PARSE_ID(Id,
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
{error, not_found} ->
case ensure_bridge(BridgeType, BridgeName, maps:remove(<<"id">>, Conf)) of
ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 201);
{error, Error} -> {400, Error}
end
end);
list_create_bridges_in_cluster(get, _Params) ->
'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) ->
BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()),
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
{error, not_found} ->
case ensure_bridge_created(BridgeType, BridgeName, Conf) of
ok -> lookup_from_all_nodes(BridgeType, BridgeName, 201);
{error, Error} -> {400, Error}
end
end;
'/bridges'(get, _Params) ->
{200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
list_local_bridges(Node) when Node =:= node() ->
@ -238,22 +284,22 @@ list_local_bridges(Node) when Node =:= node() ->
list_local_bridges(Node) ->
rpc_call(Node, list_local_bridges, [Node]).
crud_bridges_in_cluster(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id, lookup_from_all_nodes(Id, BridgeType, BridgeName, 200));
'/bridges/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
crud_bridges_in_cluster(put, #{bindings := #{id := Id}, body := Conf}) ->
'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf}) ->
?TRY_PARSE_ID(Id,
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} ->
case ensure_bridge(BridgeType, BridgeName, Conf) of
ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 200);
case ensure_bridge_created(BridgeType, BridgeName, Conf) of
ok -> lookup_from_all_nodes(BridgeType, BridgeName, 200);
{error, Error} -> {400, Error}
end;
{error, not_found} ->
{404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}}
end);
crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) ->
'/bridges/:id'(delete, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id,
case emqx_conf:remove(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
#{override_to => cluster}) of
@ -262,12 +308,12 @@ crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) ->
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
end).
lookup_from_all_nodes(Id, BridgeType, BridgeName, SuccCode) ->
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of
{ok, [{ok, _} | _] = Results} ->
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
{ok, [{error, not_found} | _]} ->
{404, error_msg('NOT_FOUND', <<"not_found: ", Id/binary>>)};
{404, error_msg('NOT_FOUND', <<"not_found">>)};
{error, ErrL} ->
{500, error_msg('UNKNOWN_ERROR', ErrL)}
end.
@ -278,7 +324,8 @@ lookup_from_local_node(BridgeType, BridgeName) ->
Error -> Error
end.
manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) ->
'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings :=
#{node := Node, id := Id, operation := Op}}) ->
OperFun =
fun (<<"start">>) -> start;
(<<"stop">>) -> stop;
@ -292,9 +339,10 @@ manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}})
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
end).
ensure_bridge(BridgeType, BridgeName, Conf) ->
case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
#{override_to => cluster}) of
ensure_bridge_created(BridgeType, BridgeName, Conf) ->
Conf1 = maps:without([<<"type">>, <<"name">>], Conf),
case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
Conf1, #{override_to => cluster}) of
{ok, _} -> ok;
{error, Reason} ->
{error, error_msg('BAD_ARG', Reason)}
@ -346,12 +394,14 @@ aggregate_metrics(AllMetrics) ->
end, InitMetrics, AllMetrics).
format_resp(#{id := Id, raw_config := RawConf,
resource_data := #{mod := Mod, status := Status, metrics := Metrics}}) ->
resource_data := #{status := Status, metrics := Metrics}}) ->
{Type, Name} = emqx_bridge:parse_bridge_id(Id),
IsConnected = fun(started) -> connected; (_) -> disconnected end,
RawConf#{
id => Id,
type => Type,
name => Name,
node => node(),
bridge_type => emqx_bridge:bridge_type(Mod),
status => IsConnected(Status),
metrics => Metrics
}.
@ -378,4 +428,7 @@ rpc_call(Node, Mod, Fun, Args) ->
error_msg(Code, Msg) when is_binary(Msg) ->
#{code => Code, message => Msg};
error_msg(Code, Msg) ->
#{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.
#{code => Code, message => bin(io_lib:format("~p", [Msg]))}.
bin(S) when is_list(S) ->
list_to_binary(S).

View File

@ -0,0 +1,95 @@
-module(emqx_bridge_http_schema).
-include_lib("typerefl/include/types.hrl").
-import(hoconsc, [mk/2, enum/1]).
-export([roots/0, fields/1]).
%%======================================================================================
%% Hocon Schema Definitions
roots() -> [].
fields("bridge") ->
basic_config() ++
[ {url, mk(binary(),
#{ nullable => false
, desc =>"""
The URL of the HTTP Bridge.<br>
Template with variables is allowed in the path, but variables cannot be used in the scheme, host,
or port part.<br>
For example, <code> http://localhost:9901/${topic} </code> is allowed, but
<code> http://${host}:9901/message </code> or <code> http://localhost:${port}/message </code>
is not allowed.
"""
})}
, {from_local_topic, mk(binary(),
#{ desc =>"""
The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
match the from_local_topic will be forwarded.<br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches
from_local_topic will be forwarded.
"""
})}
, {method, mk(method(),
#{ default => post
, desc =>"""
The method of the HTTP request. All the available methods are: post, put, get, delete.<br>
Template with variables is allowed.<br>
"""
})}
, {headers, mk(map(),
#{ default => #{
<<"accept">> => <<"application/json">>,
<<"cache-control">> => <<"no-cache">>,
<<"connection">> => <<"keep-alive">>,
<<"content-type">> => <<"application/json">>,
<<"keep-alive">> => <<"timeout=5">>}
, desc =>"""
The headers of the HTTP request.<br>
Template with variables is allowed.
"""
})
}
, {body, mk(binary(),
#{ default => <<"${payload}">>
, desc =>"""
The body of the HTTP request.<br>
Template with variables is allowed.
"""
})}
, {request_timeout, mk(emqx_schema:duration_ms(),
#{ default => <<"30s">>
, desc =>"""
How long will the HTTP request timeout.
"""
})}
];
fields("post") ->
[ type_field()
, name_field()
] ++ fields("bridge");
fields("put") ->
fields("bridge");
fields("get") ->
[ id_field()
] ++ fields("post").
basic_config() ->
proplists:delete(base_url, emqx_connector_http:fields(config)).
%%======================================================================================
id_field() ->
{id, mk(binary(), #{desc => "The Bridge Id", example => "http:my_http_bridge"})}.
type_field() ->
{type, mk(http, #{desc => "The Bridge Type"})}.
name_field() ->
{name, mk(binary(), #{desc => "The Bridge Name"})}.
method() ->
enum([post, put, get, delete]).

View File

@ -0,0 +1,62 @@
-module(emqx_bridge_mqtt_schema).
-include_lib("typerefl/include/types.hrl").
-import(hoconsc, [mk/2]).
-export([roots/0, fields/1]).
%%======================================================================================
%% Hocon Schema Definitions
roots() -> [].
fields("ingress") ->
[ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
, emqx_bridge_schema:connector_name()
] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
fields("egress") ->
[ direction(egress, emqx_connector_mqtt_schema:egress_desc())
, emqx_bridge_schema:connector_name()
] ++ emqx_connector_mqtt_schema:fields("egress");
fields("post_ingress") ->
[ type_field()
, name_field()
] ++ fields("ingress");
fields("post_egress") ->
[ type_field()
, name_field()
] ++ fields("egress");
fields("put_ingress") ->
fields("ingress");
fields("put_egress") ->
fields("egress");
fields("get_ingress") ->
[ id_field()
] ++ fields("post_ingress");
fields("get_egress") ->
[ id_field()
] ++ fields("post_egress").
%%======================================================================================
direction(Dir, Desc) ->
{direction, mk(Dir,
#{ nullable => false
, desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.<br>"
++ Desc
})}.
id_field() ->
{id, mk(binary(), #{desc => "The Bridge Id", example => "mqtt:my_mqtt_bridge"})}.
type_field() ->
{type, mk(mqtt, #{desc => "The Bridge Type"})}.
name_field() ->
{name, mk(binary(),
#{ desc => "The Bridge Name"
, example => "some_bridge_name"
})}.

View File

@ -2,123 +2,63 @@
-include_lib("typerefl/include/types.hrl").
-import(hoconsc, [mk/2, ref/2]).
-export([roots/0, fields/1]).
-export([ get_response/0
, put_request/0
, post_request/0
]).
-export([ connector_name/0
]).
%%======================================================================================
%% Hocon Schema Definitions
roots() -> [bridges].
-define(CONN_TYPES, [mqtt]).
fields(bridges) ->
[ {mqtt,
sc(hoconsc:map(name, hoconsc:union([ ref("ingress_mqtt_bridge")
, ref("egress_mqtt_bridge")
])),
#{ desc => "MQTT bridges"
})}
, {http,
sc(hoconsc:map(name, ref("http_bridge")),
#{ desc => "HTTP bridges"
})}
];
fields("ingress_mqtt_bridge") ->
[ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
, connector_name()
] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
fields("egress_mqtt_bridge") ->
[ direction(egress, emqx_connector_mqtt_schema:egress_desc())
, connector_name()
] ++ emqx_connector_mqtt_schema:fields("egress");
fields("http_bridge") ->
basic_config_http() ++
[ {url,
sc(binary(),
#{ nullable => false
, desc =>"""
The URL of the HTTP Bridge.<br>
Template with variables is allowed in the path, but variables cannot be used in the scheme, host,
or port part.<br>
For example, <code> http://localhost:9901/${topic} </code> is allowed, but
<code> http://${host}:9901/message </code> or <code> http://localhost:${port}/message </code>
is not allowed.
"""
})}
, {from_local_topic,
sc(binary(),
#{ desc =>"""
The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
match the from_local_topic will be forwarded.<br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches
from_local_topic will be forwarded.
"""
})}
, {method,
sc(method(),
#{ default => post
, desc =>"""
The method of the HTTP request. All the available methods are: post, put, get, delete.<br>
Template with variables is allowed.<br>
"""
})}
, {headers,
sc(map(),
#{ default => #{
<<"accept">> => <<"application/json">>,
<<"cache-control">> => <<"no-cache">>,
<<"connection">> => <<"keep-alive">>,
<<"content-type">> => <<"application/json">>,
<<"keep-alive">> => <<"timeout=5">>}
, desc =>"""
The headers of the HTTP request.<br>
Template with variables is allowed.
"""
})
}
, {body,
sc(binary(),
#{ default => <<"${payload}">>
, desc =>"""
The body of the HTTP request.<br>
Template with variables is allowed.
"""
})}
, {request_timeout,
sc(emqx_schema:duration_ms(),
#{ default => <<"30s">>
, desc =>"""
How long will the HTTP request timeout.
"""
})}
].
direction(Dir, Desc) ->
{direction,
sc(Dir,
#{ nullable => false
, desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.<br>" ++
Desc
})}.
%%======================================================================================
%% For HTTP APIs
get_response() ->
http_schema("get").
connector_name() ->
{connector,
sc(binary(),
mk(binary(),
#{ nullable => false
, desc =>"""
The connector name to be used for this bridge.
Connectors are configured as 'connectors.type.name',
Connectors are configured as 'connectors.{type}.{name}',
for example 'connectors.http.mybridge'.
"""
})}.
basic_config_http() ->
proplists:delete(base_url, emqx_connector_http:fields(config)).
put_request() ->
http_schema("put").
method() ->
hoconsc:enum([post, put, get, delete]).
post_request() ->
http_schema("post").
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
http_schema(Method) ->
Schemas = lists:flatmap(fun(Type) ->
[ref(schema_mod(Type), Method ++ "_ingress"),
ref(schema_mod(Type), Method ++ "_egress")]
end, ?CONN_TYPES),
hoconsc:union([ref(emqx_bridge_http_schema, Method)
| Schemas]).
ref(Field) -> hoconsc:ref(?MODULE, Field).
%%======================================================================================
%% For config files
roots() -> [bridges].
fields(bridges) ->
[{http, mk(hoconsc:map(name, ref(emqx_bridge_http_schema, "bridge")), #{})}]
++ [{T, mk(hoconsc:map(name, hoconsc:union([
ref(schema_mod(T), "ingress"),
ref(schema_mod(T), "egress")
])), #{})} || T <- ?CONN_TYPES].
schema_mod(Type) ->
list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])).

View File

@ -21,7 +21,9 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"bridges: {}">>).
-define(TEST_ID, <<"http:test_bridge">>).
-define(BRIDGE_TYPE, <<"http">>).
-define(BRIDGE_NAME, <<"test_bridge">>).
-define(BRIDGE_ID, <<"http:test_bridge">>).
-define(URL(PORT, PATH), list_to_binary(
io_lib:format("http://localhost:~s/~s",
[integer_to_list(PORT), PATH]))).
@ -134,11 +136,15 @@ t_http_crud_apis(_) ->
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}),
?HTTP_BRIDGE(URL1)#{
<<"type">> => ?BRIDGE_TYPE,
<<"name">> => ?BRIDGE_NAME
}),
%ct:pal("---bridge: ~p", [Bridge]),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
@ -148,7 +154,10 @@ t_http_crud_apis(_) ->
%% create a again returns an error
{ok, 400, RetMsg} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}),
?HTTP_BRIDGE(URL1)#{
<<"type">> => ?BRIDGE_TYPE,
<<"name">> => ?BRIDGE_NAME
}),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"bridge already exists">>
@ -156,10 +165,11 @@ t_http_crud_apis(_) ->
%% update the request-path of the bridge
URL2 = ?URL(Port, "path2"),
{ok, 200, Bridge2} = request(put, uri(["bridges", ?TEST_ID]),
{ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]),
?HTTP_BRIDGE(URL2)),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
@ -169,8 +179,9 @@ t_http_crud_apis(_) ->
%% list all bridges again, assert Bridge2 is in it
{ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []),
?assertMatch([#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
?assertMatch([#{ <<"id">> := ?BRIDGE_ID
, <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
@ -179,9 +190,10 @@ t_http_crud_apis(_) ->
}], jsx:decode(Bridge2Str)),
%% get the bridge by id
{ok, 200, Bridge3Str} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
{ok, 200, Bridge3Str} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
@ -190,11 +202,11 @@ t_http_crud_apis(_) ->
}, jsx:decode(Bridge3Str)),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?TEST_ID]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% update a deleted bridge returns an error
{ok, 404, ErrMsg2} = request(put, uri(["bridges", ?TEST_ID]),
{ok, 404, ErrMsg2} = request(put, uri(["bridges", ?BRIDGE_ID]),
?HTTP_BRIDGE(URL2)),
?assertMatch(
#{ <<"code">> := _
@ -206,11 +218,15 @@ t_start_stop_bridges(_) ->
Port = start_http_server(fun handle_fun_200_ok/1),
URL1 = ?URL(Port, "abc"),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}),
?HTTP_BRIDGE(URL1)#{
<<"type">> => ?BRIDGE_TYPE,
<<"name">> => ?BRIDGE_NAME
}),
%ct:pal("the bridge ==== ~p", [Bridge]),
?assertMatch(
#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
#{ <<"id">> := ?BRIDGE_ID
, <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
@ -219,42 +235,42 @@ t_start_stop_bridges(_) ->
}, jsx:decode(Bridge)),
%% stop it
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "stop"]),
uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "stop"]),
<<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
{ok, 200, Bridge2} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"disconnected">>
}, jsx:decode(Bridge2)),
%% start again
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "start"]),
uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "start"]),
<<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% restart an already started bridge
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "restart"]),
uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "restart"]),
<<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% stop it again
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "stop"]),
uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "stop"]),
<<"">>),
%% restart a stopped bridge
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "restart"]),
uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "restart"]),
<<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
{ok, 200, Bridge4} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge4)),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?TEST_ID]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
%%--------------------------------------------------------------------

View File

@ -1,4 +1,5 @@
#connectors.mqtt.my_mqtt_connector {
# mode = cluster_shareload
# server = "127.0.0.1:1883"
# proto_ver = "v4"
# username = "username1"
@ -8,7 +9,6 @@
# retry_interval = "30s"
# max_inflight = 32
# reconnect_interval = "30s"
# bridge_mode = true
# replayq {
# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
# seg_bytes = "100MB"

View File

@ -30,6 +30,8 @@
%% API callbacks
-export(['/connectors_test'/2, '/connectors'/2, '/connectors/:id'/2]).
-define(CONN_TYPES, [mqtt]).
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_connector:parse_connector_id(Id) of
{ConnType, ConnName} ->
@ -38,7 +40,7 @@
catch
error:{invalid_bridge_id, Id0} ->
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
". Bridge ID must be of format 'bridge_type:name'">>}}
". Bridge Ids must be of format {type}:{name}">>}}
end).
namespace() -> "connector".
@ -53,17 +55,71 @@ error_schema(Code, Message) ->
, {message, mk(string(), #{example => Message})}
].
connector_info() ->
hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_info")
]).
put_request_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
emqx_connector_schema:put_request(), connector_info_examples(put)).
connector_test_info() ->
hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_test_info")
]).
post_request_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
emqx_connector_schema:post_request(), connector_info_examples(post)).
connector_req() ->
hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector")
]).
get_response_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
emqx_connector_schema:get_response(), connector_info_examples(get)).
connector_info_array_example(Method) ->
[Config || #{value := Config} <- maps:values(connector_info_examples(Method))].
connector_info_examples(Method) ->
lists:foldl(fun(Type, Acc) ->
SType = atom_to_list(Type),
maps:merge(Acc, #{
Type => #{
summary => bin(string:uppercase(SType) ++ " Connector"),
value => info_example(Type, Method)
}
})
end, #{}, ?CONN_TYPES).
info_example(Type, Method) ->
maps:merge(info_example_basic(Type),
method_example(Type, Method)).
method_example(Type, get) ->
SType = atom_to_list(Type),
SName = "my_" ++ SType ++ "_connector",
#{
id => bin(SType ++ ":" ++ SName),
type => bin(SType),
name => bin(SName)
};
method_example(Type, post) ->
SType = atom_to_list(Type),
SName = "my_" ++ SType ++ "_connector",
#{
type => bin(SType),
name => bin(SName)
};
method_example(_Type, put) ->
#{}.
info_example_basic(mqtt) ->
#{
mode => cluster_shareload,
server => <<"127.0.0.1:1883">>,
reconnect_interval => <<"30s">>,
proto_ver => <<"v4">>,
username => <<"foo">>,
password => <<"bar">>,
clientid => <<"foo">>,
clean_start => true,
keepalive => <<"300s">>,
retry_interval => <<"30s">>,
max_inflight => 100,
ssl => #{
enable => false
}
}.
param_path_id() ->
[{id, mk(binary(), #{in => path, example => <<"mqtt:my_mqtt_connector">>})}].
@ -74,9 +130,9 @@ schema("/connectors_test") ->
post => #{
tags => [<<"connectors">>],
description => <<"Test creating a new connector by given Id <br>"
"The ID must be of format 'type:name'">>,
"The ID must be of format '{type}:{name}'">>,
summary => <<"Test creating connector">>,
requestBody => connector_test_info(),
requestBody => post_request_body_schema(),
responses => #{
200 => <<"Test connector OK">>,
400 => error_schema('TEST_FAILED', "connector test failed")
@ -92,17 +148,19 @@ schema("/connectors") ->
description => <<"List all connectors">>,
summary => <<"List connectors">>,
responses => #{
200 => mk(array(connector_info()), #{desc => "List of connectors"})
200 => emqx_dashboard_swagger:schema_with_example(
array(emqx_connector_schema:get_response()),
connector_info_array_example(get))
}
},
post => #{
tags => [<<"connectors">>],
description => <<"Create a new connector by given Id <br>"
"The ID must be of format 'type:name'">>,
"The ID must be of format '{type}:{name}'">>,
summary => <<"Create connector">>,
requestBody => connector_info(),
requestBody => post_request_body_schema(),
responses => #{
201 => connector_info(),
201 => get_response_body_schema(),
400 => error_schema('ALREADY_EXISTS', "connector already exists")
}
}
@ -117,7 +175,7 @@ schema("/connectors/:id") ->
summary => <<"Get connector">>,
parameters => param_path_id(),
responses => #{
200 => connector_info(),
200 => get_response_body_schema(),
404 => error_schema('NOT_FOUND', "Connector not found")
}
},
@ -126,9 +184,9 @@ schema("/connectors/:id") ->
description => <<"Update an existing connector by Id">>,
summary => <<"Update connector">>,
parameters => param_path_id(),
requestBody => connector_req(),
requestBody => put_request_body_schema(),
responses => #{
200 => <<"Update connector successfully">>,
200 => get_response_body_schema(),
400 => error_schema('UPDATE_FAIL', "Update failed"),
404 => error_schema('NOT_FOUND', "Connector not found")
}},
@ -143,8 +201,8 @@ schema("/connectors/:id") ->
}}
}.
'/connectors_test'(post, #{body := #{<<"bridge_type">> := ConnType} = Params}) ->
case emqx_connector:create_dry_run(ConnType, maps:remove(<<"bridge_type">>, Params)) of
'/connectors_test'(post, #{body := #{<<"type">> := ConnType} = Params}) ->
case emqx_connector:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of
ok -> {200};
{error, Error} ->
{400, error_msg('BAD_ARG', Error)}
@ -153,17 +211,20 @@ schema("/connectors/:id") ->
'/connectors'(get, _Request) ->
{200, emqx_connector:list()};
'/connectors'(post, #{body := #{<<"id">> := Id} = Params}) ->
?TRY_PARSE_ID(Id,
case emqx_connector:lookup(ConnType, ConnName) of
{ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
{error, not_found} ->
case emqx_connector:update(ConnType, ConnName, maps:remove(<<"id">>, Params)) of
{ok, #{raw_config := RawConf}} -> {201, RawConf#{<<"id">> => Id}};
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
end
end).
'/connectors'(post, #{body := #{<<"type">> := ConnType} = Params}) ->
ConnName = maps:get(<<"name">>, Params, emqx_misc:gen_id()),
case emqx_connector:lookup(ConnType, ConnName) of
{ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
{error, not_found} ->
case emqx_connector:update(ConnType, ConnName,
maps:without([<<"type">>, <<"name">>], Params)) of
{ok, #{raw_config := RawConf}} ->
{201, RawConf#{<<"id">> =>
emqx_connector:connector_id(ConnType, ConnName)}};
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
end
end.
'/connectors/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id,
@ -200,4 +261,7 @@ schema("/connectors/:id") ->
error_msg(Code, Msg) when is_binary(Msg) ->
#{code => Code, message => Msg};
error_msg(Code, Msg) ->
#{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.
#{code => Code, message => bin(io_lib:format("~p", [Msg]))}.
bin(S) when is_list(S) ->
list_to_binary(S).

View File

@ -40,6 +40,8 @@
-behaviour(hocon_schema).
-import(hoconsc, [mk/2]).
-export([ roots/0
, fields/1]).
@ -49,7 +51,25 @@ roots() ->
fields("config").
fields("config") ->
emqx_connector_mqtt_schema:fields("config").
emqx_connector_mqtt_schema:fields("config");
fields("get") ->
[{id, mk(binary(),
#{ desc => "The connector Id"
, example => <<"mqtt:my_mqtt_connector">>
})}]
++ fields("post");
fields("put") ->
emqx_connector_mqtt_schema:fields("connector");
fields("post") ->
[ {type, mk(mqtt, #{desc => "The Connector Type"})}
, {name, mk(binary(),
#{ desc => "The Connector Name"
, example => <<"my_mqtt_connector">>
})}
] ++ fields("put").
%% ===================================================================
%% supervisor APIs
@ -100,7 +120,7 @@ on_start(InstId, Conf) ->
BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{
name => InstanceId,
clientid => clientid(InstanceId),
clientid => clientid(maps:get(clientid, Conf, InstId)),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)),
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
},
@ -162,7 +182,6 @@ basic_config(#{
server := Server,
reconnect_interval := ReconnIntv,
proto_ver := ProtoVer,
bridge_mode := BridgeMod,
username := User,
password := Password,
clean_start := CleanStart,
@ -177,7 +196,7 @@ basic_config(#{
server => Server,
reconnect_interval => ReconnIntv,
proto_ver => ProtoVer,
bridge_mode => BridgeMod,
bridge_mode => true,
username => User,
password => Password,
clean_start => CleanStart,
@ -190,4 +209,4 @@ basic_config(#{
}.
clientid(Id) ->
list_to_binary(lists:concat([Id, ":", node()])).
iolist_to_binary([Id, ":", atom_to_list(node())]).

View File

@ -4,8 +4,33 @@
-include_lib("typerefl/include/types.hrl").
-import(hoconsc, [mk/2, ref/2]).
-export([roots/0, fields/1]).
-export([ get_response/0
, put_request/0
, post_request/0
]).
-define(CONN_TYPES, [mqtt]).
%%======================================================================================
%% For HTTP APIs
get_response() ->
http_schema("get").
put_request() ->
http_schema("put").
post_request() ->
http_schema("post").
http_schema(Method) ->
Schemas = [ref(schema_mod(Type), Method) || Type <- ?CONN_TYPES],
hoconsc:union(Schemas).
%%======================================================================================
%% Hocon Schema Definitions
@ -14,24 +39,12 @@ roots() -> ["connectors"].
fields(connectors) -> fields("connectors");
fields("connectors") ->
[ {mqtt,
sc(hoconsc:map(name,
hoconsc:union([ ref("mqtt_connector")
mk(hoconsc:map(name,
hoconsc:union([ ref(emqx_connector_mqtt_schema, "connector")
])),
#{ desc => "MQTT bridges"
})}
];
].
fields("mqtt_connector") ->
emqx_connector_mqtt_schema:fields("connector");
fields("mqtt_connector_info") ->
[{id, sc(binary(), #{desc => "The connector Id"})}]
++ fields("mqtt_connector");
fields("mqtt_connector_test_info") ->
[{bridge_type, sc(mqtt, #{desc => "The Bridge Type"})}]
++ fields("mqtt_connector").
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).
schema_mod(Type) ->
list_to_atom(lists:concat(["emqx_connector_", Type])).

View File

@ -8,7 +8,7 @@
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% cluster_shareload under the License is cluster_shareload on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
@ -38,7 +38,24 @@ fields("config") ->
topic_mappings();
fields("connector") ->
[ {server,
[ {mode,
sc(hoconsc:enum([cluster_singleton, cluster_shareload]),
#{ default => cluster_shareload
, desc => """
The mode of the MQTT Bridge. Can be one of 'cluster_singleton' or 'cluster_shareload'<br>
- cluster_singleton: create an unique MQTT connection within the emqx cluster.<br>
In 'cluster_singleton' node, all messages toward the remote broker go through the same
MQTT connection.<br>
- cluster_shareload: create an MQTT connection on each node in the emqx cluster.<br>
In 'cluster_shareload' mode, the incomming load from the remote broker is shared by
using shared subscription.<br>
Note that the 'clientid' is suffixed by the node name, this is to avoid
clientid conflicts between different nodes. And we can only use shared subscription
topic filters for 'from_remote_topic'.
"""
})}
, {server,
sc(emqx_schema:ip_port(),
#{ default => "127.0.0.1:1883"
, desc => "The host and port of the remote MQTT broker"
@ -49,11 +66,6 @@ fields("connector") ->
#{ default => v4
, desc => "The MQTT protocol version"
})}
, {bridge_mode,
sc(boolean(),
#{ default => true
, desc => "The bridge mode of the MQTT protocol"
})}
, {username,
sc(binary(),
#{ default => "emqx"
@ -66,8 +78,7 @@ fields("connector") ->
})}
, {clientid,
sc(binary(),
#{ default => "emqx_${nodename}"
, desc => "The clientid of the MQTT protocol"
#{ desc => "The clientid of the MQTT protocol"
})}
, {clean_start,
sc(boolean(),

View File

@ -24,7 +24,11 @@
-define(CONF_DEFAULT, <<"connectors: {}">>).
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
-define(CONNECTR_TYPE, <<"mqtt">>).
-define(CONNECTR_NAME, <<"test_connector">>).
-define(CONNECTR_ID, <<"mqtt:test_connector">>).
-define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>).
-define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>).
-define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>).
-define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>).
-define(MQTT_CONNECOTR(Username),
@ -63,8 +67,8 @@
-define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX),
#{<<"matched">> := MATCH, <<"success">> := SUCC,
<<"failed">> := FAILED, <<"speed">> := SPEED,
<<"speed_last5m">> := SPEED5M, <<"speed_max">> := SPEEDMAX}).
<<"failed">> := FAILED, <<"rate">> := SPEED,
<<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -115,7 +119,9 @@ t_mqtt_crud_apis(_) ->
%% POST /connectors/ will create a connector
User1 = <<"user1">>,
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
@ -128,7 +134,9 @@ t_mqtt_crud_apis(_) ->
%% create a again returns an error
{ok, 400, RetMsg} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"connector already exists">>
@ -187,7 +195,9 @@ t_mqtt_conn_bridge_ingress(_) ->
%% then we add a mqtt connector, using POST
User1 = <<"user1">>,
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
@ -201,11 +211,14 @@ t_mqtt_conn_bridge_ingress(_) ->
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_INGRESS}),
?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_INGRESS
}),
%ct:pal("---bridge: ~p", [Bridge]),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS
, <<"bridge_type">> := <<"mqtt">>
, <<"type">> := <<"mqtt">>
, <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID
}, jsx:decode(Bridge)),
@ -250,7 +263,9 @@ t_mqtt_conn_bridge_egress(_) ->
%% then we add a mqtt connector, using POST
User1 = <<"user1">>,
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
@ -264,11 +279,15 @@ t_mqtt_conn_bridge_egress(_) ->
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}),
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}),
%ct:pal("---bridge: ~p", [Bridge]),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
, <<"bridge_type">> := <<"mqtt">>
, <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID
}, jsx:decode(Bridge)),
@ -322,7 +341,10 @@ t_mqtt_conn_update(_) ->
%% then we add a mqtt connector, using POST
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"id">> => ?CONNECTR_ID}),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)
#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
@ -332,9 +354,13 @@ t_mqtt_conn_update(_) ->
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}),
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
, <<"bridge_type">> := <<"mqtt">>
, <<"type">> := <<"mqtt">>
, <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID
}, jsx:decode(Bridge)),
@ -358,9 +384,15 @@ t_mqtt_conn_testing(_) ->
%% APIs for testing the connectivity
%% then we add a mqtt connector, using POST
{ok, 200, <<>>} = request(post, uri(["connectors_test"]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"bridge_type">> => <<"mqtt">>}),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}),
{ok, 400, _} = request(post, uri(["connectors_test"]),
?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)#{<<"bridge_type">> => <<"mqtt">>}).
?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}).
%%--------------------------------------------------------------------
%% HTTP Request

View File

@ -27,7 +27,7 @@
-export([ inc/3
, inc/4
, get/3
, get_speed/2
, get_rate/2
, create_metrics/2
, clear_metrics/2
]).
@ -54,7 +54,7 @@
-define(SECS_5M, 300).
-define(SAMPLING, 10).
-else.
%% Use 5 secs average speed instead of 5 mins in case of testing
%% Use 5 secs average rate instead of 5 mins in case of testing
-define(SECS_5M, 5).
-define(SAMPLING, 1).
-endif.
@ -65,9 +65,9 @@
matched => integer(),
success => integer(),
failed => integer(),
speed => float(),
speed_max => float(),
speed_last5m => float()
rate => float(),
rate_max => float(),
rate_last5m => float()
}.
-type handler_name() :: atom().
-type metric_id() :: binary().
@ -75,22 +75,22 @@
-define(CntrRef(Name), {?MODULE, Name}).
-define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).
%% the speed of 'matched'
-record(speed, {
%% the rate of 'matched'
-record(rate, {
max = 0 :: number(),
current = 0 :: number(),
last5m = 0 :: number(),
%% metadata for calculating the avg speed
%% metadata for calculating the avg rate
tick = 1 :: number(),
last_v = 0 :: number(),
%% metadata for calculating the 5min avg speed
%% metadata for calculating the 5min avg rate
last5m_acc = 0 :: number(),
last5m_smpl = [] :: list()
}).
-record(state, {
metric_ids = sets:new(),
speeds :: undefined | #{metric_id() => #speed{}}
rates :: undefined | #{metric_id() => #rate{}}
}).
%%------------------------------------------------------------------------------
@ -122,19 +122,19 @@ get(Name, Id, Metric) ->
Ref -> counters:get(Ref, metrics_idx(Metric))
end.
-spec(get_speed(handler_name(), metric_id()) -> map()).
get_speed(Name, Id) ->
gen_server:call(Name, {get_speed, Id}).
-spec(get_rate(handler_name(), metric_id()) -> map()).
get_rate(Name, Id) ->
gen_server:call(Name, {get_rate, Id}).
-spec(get_metrics(handler_name(), metric_id()) -> metrics()).
get_metrics(Name, Id) ->
#{max := Max, current := Current, last5m := Last5M} = get_speed(Name, Id),
#{max := Max, current := Current, last5m := Last5M} = get_rate(Name, Id),
#{matched => get_matched(Name, Id),
success => get_success(Name, Id),
failed => get_failed(Name, Id),
speed => Current,
speed_max => Max,
speed_last5m => Last5M
rate => Current,
rate_max => Max,
rate_last5m => Last5M
}.
-spec inc(handler_name(), metric_id(), atom()) -> ok.
@ -176,35 +176,35 @@ start_link(Name) ->
init(Name) ->
erlang:process_flag(trap_exit, true),
%% the speed metrics
%% the rate metrics
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
persistent_term:put(?CntrRef(Name), #{}),
{ok, #state{}}.
handle_call({get_speed, _Id}, _From, State = #state{speeds = undefined}) ->
{reply, format_speed(#speed{}), State};
handle_call({get_speed, Id}, _From, State = #state{speeds = Speeds}) ->
{reply, case maps:get(Id, Speeds, undefined) of
undefined -> format_speed(#speed{});
Speed -> format_speed(Speed)
handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) ->
{reply, format_rate(#rate{}), State};
handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
{reply, case maps:get(Id, Rates, undefined) of
undefined -> format_rate(#rate{});
Rate -> format_rate(Rate)
end, State};
handle_call({create_metrics, Id}, _From,
State = #state{metric_ids = MIDs, speeds = Speeds}) ->
State = #state{metric_ids = MIDs, rates = Rates}) ->
{reply, create_counters(get_self_name(), Id),
State#state{metric_ids = sets:add_element(Id, MIDs),
speeds = case Speeds of
undefined -> #{Id => #speed{}};
_ -> Speeds#{Id => #speed{}}
rates = case Rates of
undefined -> #{Id => #rate{}};
_ -> Rates#{Id => #rate{}}
end}};
handle_call({delete_metrics, Id}, _From,
State = #state{metric_ids = MIDs, speeds = Speeds}) ->
State = #state{metric_ids = MIDs, rates = Rates}) ->
{reply, delete_counters(get_self_name(), Id),
State#state{metric_ids = sets:del_element(Id, MIDs),
speeds = case Speeds of
rates = case Rates of
undefined -> undefined;
_ -> maps:remove(Id, Speeds)
_ -> maps:remove(Id, Rates)
end}};
handle_call(_Request, _From, State) ->
@ -213,17 +213,17 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(ticking, State = #state{speeds = undefined}) ->
handle_info(ticking, State = #state{rates = undefined}) ->
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
{noreply, State};
handle_info(ticking, State = #state{speeds = Speeds0}) ->
Speeds = maps:map(
fun(Id, Speed) ->
calculate_speed(get_matched(get_self_name(), Id), Speed)
end, Speeds0),
handle_info(ticking, State = #state{rates = Rates0}) ->
Rates = maps:map(
fun(Id, Rate) ->
calculate_rate(get_matched(get_self_name(), Id), Rate)
end, Rates0),
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
{noreply, State#state{speeds = Speeds}};
{noreply, State#state{rates = Rates}};
handle_info(_Info, State) ->
{noreply, State}.
@ -261,38 +261,38 @@ get_couters_ref(Name, Id) ->
get_all_counters(Name) ->
persistent_term:get(?CntrRef(Name), #{}).
calculate_speed(_CurrVal, undefined) ->
calculate_rate(_CurrVal, undefined) ->
undefined;
calculate_speed(CurrVal, #speed{max = MaxSpeed0, last_v = LastVal,
tick = Tick, last5m_acc = AccSpeed5Min0,
calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal,
tick = Tick, last5m_acc = AccRate5Min0,
last5m_smpl = Last5MinSamples0}) ->
%% calculate the current speed based on the last value of the counter
CurrSpeed = (CurrVal - LastVal) / ?SAMPLING,
%% calculate the current rate based on the last value of the counter
CurrRate = (CurrVal - LastVal) / ?SAMPLING,
%% calculate the max speed since the emqx startup
MaxSpeed =
if MaxSpeed0 >= CurrSpeed -> MaxSpeed0;
true -> CurrSpeed
%% calculate the max rate since the emqx startup
MaxRate =
if MaxRate0 >= CurrRate -> MaxRate0;
true -> CurrRate
end,
%% calculate the average speed in last 5 mins
%% calculate the average rate in last 5 mins
{Last5MinSamples, Acc5Min, Last5Min} =
if Tick =< ?SAMPCOUNT_5M ->
Acc = AccSpeed5Min0 + CurrSpeed,
{lists:reverse([CurrSpeed | lists:reverse(Last5MinSamples0)]),
Acc = AccRate5Min0 + CurrRate,
{lists:reverse([CurrRate | lists:reverse(Last5MinSamples0)]),
Acc, Acc / Tick};
true ->
[FirstSpeed | Speeds] = Last5MinSamples0,
Acc = AccSpeed5Min0 + CurrSpeed - FirstSpeed,
{lists:reverse([CurrSpeed | lists:reverse(Speeds)]),
[FirstRate | Rates] = Last5MinSamples0,
Acc = AccRate5Min0 + CurrRate - FirstRate,
{lists:reverse([CurrRate | lists:reverse(Rates)]),
Acc, Acc / ?SAMPCOUNT_5M}
end,
#speed{max = MaxSpeed, current = CurrSpeed, last5m = Last5Min,
#rate{max = MaxRate, current = CurrRate, last5m = Last5Min,
last_v = CurrVal, last5m_acc = Acc5Min,
last5m_smpl = Last5MinSamples, tick = Tick + 1}.
format_speed(#speed{max = Max, current = Current, last5m = Last5Min}) ->
format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) ->
#{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}.
precision(Float, N) ->

View File

@ -24,7 +24,7 @@
all() ->
[ {group, metrics}
, {group, speed} ].
, {group, rate} ].
suite() ->
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
@ -34,8 +34,8 @@ groups() ->
[ t_rule
, t_no_creation_1
]},
{speed, [sequence],
[ rule_speed
{rate, [sequence],
[ rule_rate
]}
].
@ -74,7 +74,7 @@ t_rule(_) ->
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>),
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>).
rule_speed(_) ->
rule_rate(_) ->
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>),
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
@ -83,11 +83,11 @@ rule_speed(_) ->
?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')),
ct:sleep(1000),
?LET(#{max := Max, current := Current},
emqx_plugin_libs_metrics:get_speed(?NAME, <<"rule1">>),
emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
{?assert(Max =< 2),
?assert(Current =< 2)}),
ct:sleep(2100),
?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_speed(?NAME, <<"rule1">>),
?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
{?assert(Max =< 2),
?assert(Current == 0),
?assert(Last5Min =< 0.67)}),

View File

@ -32,13 +32,40 @@ check_params(Params, Tag) ->
roots() ->
[ {"rule_creation", sc(ref("rule_creation"), #{desc => "Schema for creating rules"})}
, {"rule_info", sc(ref("rule_info"), #{desc => "Schema for rule info"})}
, {"rule_events", sc(ref("rule_events"), #{desc => "Schema for rule events"})}
, {"rule_test", sc(ref("rule_test"), #{desc => "Schema for testing rules"})}
].
fields("rule_creation") ->
[ {"id", sc(binary(), #{desc => "The Id of the rule", nullable => false})}
[ {"id", sc(binary(),
#{ desc => "The Id of the rule", nullable => false
, example => "my_rule_id"
})}
] ++ emqx_rule_engine_schema:fields("rules");
fields("rule_info") ->
[ {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})}
, {"node_metrics", sc(ref("node_metrics"), #{desc => "The metrics of the rule"})}
, {"from", sc(hoconsc:array(binary()),
#{desc => "The topics of the rule", example => "t/#"})}
, {"created_at", sc(binary(),
#{ desc => "The created time of the rule"
, example => "2021-12-01T15:00:43.153+08:00"
})}
] ++ fields("rule_creation");
%% TODO: we can delete this API if the Dashboard not denpends on it
fields("rule_events") ->
ETopics = [emqx_rule_events:event_topic(E) || E <- emqx_rule_events:event_names()],
[ {"event", sc(hoconsc:enum(ETopics), #{desc => "The event topics", nullable => false})}
, {"title", sc(binary(), #{desc => "The title", example => "some title"})}
, {"description", sc(binary(), #{desc => "The description", example => "some desc"})}
, {"columns", sc(map(), #{desc => "The columns"})}
, {"test_columns", sc(map(), #{desc => "The test columns"})}
, {"sql_example", sc(binary(), #{desc => "The sql_example"})}
];
fields("rule_test") ->
[ {"context", sc(hoconsc:union([ ref("ctx_pub")
, ref("ctx_sub")
@ -53,6 +80,18 @@ fields("rule_test") ->
, {"sql", sc(binary(), #{desc => "The SQL of the rule for testing", nullable => false})}
];
fields("metrics") ->
[ {"matched", sc(integer(), #{desc => "How much times this rule is matched"})}
, {"rate", sc(float(), #{desc => "The rate of matched, times/second"})}
, {"rate_max", sc(float(), #{desc => "The max rate of matched, times/second"})}
, {"rate_last5m", sc(float(),
#{desc => "The average rate of matched in last 5 mins, times/second"})}
];
fields("node_metrics") ->
[ {"node", sc(binary(), #{desc => "The node name", example => "emqx@127.0.0.1"})}
] ++ fields("metrics");
fields("ctx_pub") ->
[ {"event_type", sc(message_publish, #{desc => "Event Type", nullable => false})}
, {"id", sc(binary(), #{desc => "Message ID"})}

View File

@ -18,16 +18,17 @@
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("typerefl/include/types.hrl").
-behaviour(minirest_api).
-export([api_spec/0]).
-import(hoconsc, [mk/2, ref/2, array/1]).
-export([ crud_rules/2
, list_events/2
, crud_rules_by_id/2
, rule_test/2
]).
%% Swagger specs from hocon schema
-export([api_spec/0, paths/0, schema/1, namespace/0]).
%% API callbacks
-export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2]).
-define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))).
-define(ERR_BADARGS(REASON),
@ -43,210 +44,130 @@
{400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}}
end).
namespace() -> "rule".
api_spec() ->
{
[ api_rules_list_create()
, api_rules_crud()
, api_rule_test()
, api_events_list()
],
[]
}.
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
api_rules_list_create() ->
Metadata = #{
paths() -> ["/rule_events", "/rule_test", "/rules", "/rules/:id"].
error_schema(Code, Message) ->
[ {code, mk(string(), #{example => Code})}
, {message, mk(string(), #{example => Message})}
].
rule_creation_schema() ->
ref(emqx_rule_api_schema, "rule_creation").
rule_update_schema() ->
ref(emqx_rule_engine_schema, "rules").
rule_test_schema() ->
ref(emqx_rule_api_schema, "rule_test").
rule_info_schema() ->
ref(emqx_rule_api_schema, "rule_info").
schema("/rules") ->
#{
operationId => '/rules',
get => #{
tags => [<<"rules">>],
description => <<"List all rules">>,
summary => <<"List Rules">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:array_schema(resp_schema(), <<"List rules successfully">>)}},
200 => mk(array(rule_info_schema()), #{desc => "List of rules"})
}},
post => #{
description => <<"Create a new rule using given Id to all nodes in the cluster">>,
'requestBody' => emqx_mgmt_util:schema(post_req_schema(), <<"Rule parameters">>),
tags => [<<"rules">>],
description => <<"Create a new rule using given Id">>,
summary => <<"Create a Rule">>,
requestBody => rule_creation_schema(),
responses => #{
<<"400">> =>
emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
<<"201">> =>
emqx_mgmt_util:schema(resp_schema(), <<"Create rule successfully">>)}}
},
{"/rules", Metadata, crud_rules}.
400 => error_schema('BAD_ARGS', "Invalid Parameters"),
201 => rule_info_schema()
}}
};
api_events_list() ->
Metadata = #{
schema("/rule_events") ->
#{
operationId => '/rule_events',
get => #{
tags => [<<"rules">>],
description => <<"List all events can be used in rules">>,
summary => <<"List Events">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:array_schema(resp_schema(), <<"List events successfully">>)}}
},
{"/rule_events", Metadata, list_events}.
200 => mk(ref(emqx_rule_api_schema, "rule_events"), #{})
}
}
};
api_rules_crud() ->
Metadata = #{
schema("/rules/:id") ->
#{
operationId => '/rules/:id',
get => #{
tags => [<<"rules">>],
description => <<"Get a rule by given Id">>,
parameters => [param_path_id()],
summary => <<"Get a Rule">>,
parameters => param_path_id(),
responses => #{
<<"404">> =>
emqx_mgmt_util:error_schema(<<"Rule not found">>, ['NOT_FOUND']),
<<"200">> =>
emqx_mgmt_util:schema(resp_schema(), <<"Get rule successfully">>)}},
404 => error_schema('NOT_FOUND', "Rule not found"),
200 => rule_info_schema()
}
},
put => #{
description => <<"Create or update a rule by given Id to all nodes in the cluster">>,
parameters => [param_path_id()],
'requestBody' => emqx_mgmt_util:schema(put_req_schema(), <<"Rule parameters">>),
tags => [<<"rules">>],
description => <<"Update a rule by given Id to all nodes in the cluster">>,
summary => <<"Update a Rule">>,
parameters => param_path_id(),
requestBody => rule_update_schema(),
responses => #{
<<"400">> =>
emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
<<"200">> =>
emqx_mgmt_util:schema(resp_schema(),
<<"Create or update rule successfully">>)}},
400 => error_schema('BAD_ARGS', "Invalid Parameters"),
200 => rule_info_schema()
}
},
delete => #{
tags => [<<"rules">>],
description => <<"Delete a rule by given Id from all nodes in the cluster">>,
parameters => [param_path_id()],
summary => <<"Delete a Rule">>,
parameters => param_path_id(),
responses => #{
<<"204">> =>
emqx_mgmt_util:schema(<<"Delete rule successfully">>)}}
},
{"/rules/:id", Metadata, crud_rules_by_id}.
204 => <<"Delete rule successfully">>
}
}
};
api_rule_test() ->
Metadata = #{
schema("/rule_test") ->
#{
operationId => '/rule_test',
post => #{
tags => [<<"rules">>],
description => <<"Test a rule">>,
'requestBody' => emqx_mgmt_util:schema(rule_test_req_schema(), <<"Rule parameters">>),
summary => <<"Test a Rule">>,
requestBody => rule_test_schema(),
responses => #{
<<"400">> =>
emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
<<"412">> =>
emqx_mgmt_util:error_schema(<<"SQL Not Match">>, ['NOT_MATCH']),
<<"200">> =>
emqx_mgmt_util:schema(rule_test_resp_schema(), <<"Rule Test Pass">>)}}
},
{"/rule_test", Metadata, rule_test}.
put_req_schema() ->
#{type => object,
properties => #{
sql => #{
description => <<"The SQL">>,
type => string,
example => <<"SELECT * from \"t/1\"">>
},
enable => #{
description => <<"Enable or disable the rule">>,
type => boolean,
example => true
},
outputs => #{
description => <<"The outputs of the rule">>,
type => array,
items => #{
'oneOf' => [
#{
type => string,
example => <<"channel_id_of_my_bridge">>,
description => <<"The channel id of an emqx bridge">>
},
#{
type => object,
properties => #{
function => #{
type => string,
example => <<"console">>
}
}
}
]
400 => error_schema('BAD_ARGS', "Invalid Parameters"),
412 => error_schema('NOT_MATCH', "SQL Not Match"),
200 => <<"Rule Test Pass">>
}
},
description => #{
description => <<"The description for the rule">>,
type => string,
example => <<"A simple rule that handles MQTT messages from topic \"t/1\"">>
}
}
}.
post_req_schema() ->
Req = #{properties := Prop} = put_req_schema(),
Req#{properties => Prop#{
id => #{
description => <<"The Id for the rule">>,
example => <<"my_rule">>,
type => string
}
}}.
resp_schema() ->
Req = #{properties := Prop} = put_req_schema(),
Req#{properties => Prop#{
id => #{
description => <<"The Id for the rule">>,
type => string
},
created_at => #{
description => <<"The time that this rule was created, in rfc3339 format">>,
type => string,
example => <<"2021-09-18T13:57:29+08:00">>
}
}}.
rule_test_req_schema() ->
#{type => object, properties => #{
sql => #{
description => <<"The SQL">>,
type => string,
example => <<"SELECT * from \"t/1\"">>
},
context => #{
type => object,
properties => #{
event_type => #{
description => <<"Event Type">>,
type => string,
enum => [<<"message_publish">>, <<"message_acked">>, <<"message_delivered">>,
<<"message_dropped">>, <<"session_subscribed">>, <<"session_unsubscribed">>,
<<"client_connected">>, <<"client_disconnected">>],
example => <<"message_publish">>
},
clientid => #{
description => <<"The Client ID">>,
type => string,
example => <<"\"c_emqx\"">>
},
topic => #{
description => <<"The Topic">>,
type => string,
example => <<"t/1">>
}
}
}
}}.
rule_test_resp_schema() ->
#{type => object}.
param_path_id() ->
#{
name => id,
in => path,
schema => #{type => string},
required => true
}.
[{id, mk(binary(), #{in => path, example => <<"my_rule_id">>})}].
%%------------------------------------------------------------------------------
%% Rules API
%%------------------------------------------------------------------------------
list_events(#{}, _Params) ->
'/rule_events'(get, _Params) ->
{200, emqx_rule_events:event_info()}.
crud_rules(get, _Params) ->
'/rules'(get, _Params) ->
Records = emqx_rule_engine:get_rules_ordered_by_ts(),
{200, format_rule_resp(Records)};
crud_rules(post, #{body := #{<<"id">> := Id} = Params}) ->
'/rules'(post, #{body := #{<<"id">> := Id} = Params}) ->
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx_rule_engine:get_rule(Id) of
{ok, _Rule} ->
@ -263,13 +184,13 @@ crud_rules(post, #{body := #{<<"id">> := Id} = Params}) ->
end
end.
rule_test(post, #{body := Params}) ->
'/rule_test'(post, #{body := Params}) ->
?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of
{ok, Result} -> {200, Result};
{error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}}
end).
crud_rules_by_id(get, #{bindings := #{id := Id}}) ->
'/rules/:id'(get, #{bindings := #{id := Id}}) ->
case emqx_rule_engine:get_rule(Id) of
{ok, Rule} ->
{200, format_rule_resp(Rule)};
@ -277,7 +198,7 @@ crud_rules_by_id(get, #{bindings := #{id := Id}}) ->
{404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
end;
crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) ->
'/rules/:id'(put, #{bindings := #{id := Id}, body := Params}) ->
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
@ -289,7 +210,7 @@ crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) ->
{400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
end;
crud_rules_by_id(delete, #{bindings := #{id := Id}}) ->
'/rules/:id'(delete, #{bindings := #{id := Id}}) ->
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx:remove_config(ConfPath, #{}) of
{ok, _} -> {204};
@ -315,11 +236,13 @@ format_rule_resp(#{ id := Id, created_at := CreatedAt,
sql := SQL,
enabled := Enabled,
description := Descr}) ->
NodeMetrics = get_rule_metrics(Id),
#{id => Id,
from => Topics,
outputs => format_output(Output),
sql => SQL,
metrics => get_rule_metrics(Id),
metrics => aggregate_metrics(NodeMetrics),
node_metrics => NodeMetrics,
enabled => Enabled,
created_at => format_datetime(CreatedAt, millisecond),
description => Descr
@ -339,19 +262,28 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
get_rule_metrics(Id) ->
Format = fun (Node, #{matched := Matched,
speed := Current,
speed_max := Max,
speed_last5m := Last5M
rate := Current,
rate_max := Max,
rate_last5m := Last5M
}) ->
#{ matched => Matched
, speed => Current
, speed_max => Max
, speed_last5m => Last5M
, rate => Current
, rate_max => Max
, rate_last5m => Last5M
, node => Node
}
end,
[Format(Node, rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [rule_metrics, Id]))
|| Node <- mria_mnesia:running_nodes()].
aggregate_metrics(AllMetrics) ->
InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0},
lists:foldl(fun
(#{matched := Match1, rate := Rate1, rate_max := RateMax1, rate_last5m := Rate5m1},
#{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) ->
#{matched => Match1 + Match0, rate => Rate1 + Rate0,
rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0}
end, InitMetrics, AllMetrics).
get_one_rule(AllRules, Id) ->
[R || R = #{id := Id0} <- AllRules, Id0 == Id].

View File

@ -44,19 +44,17 @@ fields("rules") ->
SQL query to transform the messages.<br>
Example: <code>SELECT * FROM \"test/topic\" WHERE payload.x = 1</code><br>
"""
, example => "SELECT * FROM \"test/topic\" WHERE payload.x = 1"
, nullable => false
, validator => fun ?MODULE:validate_sql/1})}
, {"outputs", sc(hoconsc:array(hoconsc:union(
[ binary()
, ref("builtin_output_republish")
, ref("builtin_output_console")
])),
, validator => fun ?MODULE:validate_sql/1
})}
, {"outputs", sc(hoconsc:array(hoconsc:union(outputs())),
#{ desc => """
A list of outputs of the rule.<br>
An output can be a string that refers to the channel Id of a emqx bridge, or a object
that refers to a function.<br>
There a some built-in functions like \"republish\" and \"console\", and we also support user
provided functions like \"ModuleName:FunctionName\".<br>
provided functions in the format: \"{module}:{function}\".<br>
The outputs in the list is executed one by one in order.
This means that if one of the output is executing slowly, all of the outputs comes after it will not
be executed until it returns.<br>
@ -66,9 +64,19 @@ If there's any error when running an output, there will be an error message, and
counter of the function output or the bridge channel will increase.
"""
, default => []
, example => [
<<"http:my_http_bridge">>,
#{function => republish, args => #{
topic => <<"t/1">>, payload => <<"${payload}">>}},
#{function => console}
]
})}
, {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})}
, {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})}
, {"description", sc(binary(),
#{ desc => "The description of the rule"
, example => "Some description"
, default => <<>>
})}
];
fields("builtin_output_republish") ->
@ -106,6 +114,27 @@ fields("builtin_output_console") ->
% default => #{}})}
];
fields("user_provided_function") ->
[ {function, sc(binary(),
#{ desc => """
The user provided function. Should be in the format: '{module}:{function}'.<br>
Where the <module> is the erlang callback module and the {function} is the erlang function.<br>
To write your own function, checkout the function <code>console</code> and
<code>republish</code> in the source file:
<code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> as an example.
"""
, example => "module:function"
})}
, {args, sc(map(),
#{ desc => """
The args will be passed as the 3rd argument to module:function/3,
checkout the function <code>console</code> and <code>republish</code> in the source file:
<code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> as an example.
"""
, default => #{}
})}
];
fields("republish_args") ->
[ {topic, sc(binary(),
#{ desc =>"""
@ -113,8 +142,9 @@ The target topic of message to be re-published.<br>
Template with variables is allowed, see description of the 'republish_args'.
"""
, nullable => false
, example => <<"a/1">>
})}
, {qos, sc(binary(),
, {qos, sc(qos(),
#{ desc => """
The qos of the message to be re-published.
Template with with variables is allowed, see description of the 'republish_args.<br>
@ -122,8 +152,9 @@ Defaults to ${qos}. If variable ${qos} is not found from the selected result of
0 is used.
"""
, default => <<"${qos}">>
, example => <<"${qos}">>
})}
, {retain, sc(binary(),
, {retain, sc(hoconsc:union([binary(), boolean()]),
#{ desc => """
The retain flag of the message to be re-published.
Template with with variables is allowed, see description of the 'republish_args.<br>
@ -131,6 +162,7 @@ Defaults to ${retain}. If variable ${retain} is not found from the selected resu
of the rule, false is used.
"""
, default => <<"${retain}">>
, example => <<"${retain}">>
})}
, {payload, sc(binary(),
#{ desc => """
@ -140,9 +172,20 @@ Defaults to ${payload}. If variable ${payload} is not found from the selected re
of the rule, then the string \"undefined\" is used.
"""
, default => <<"${payload}">>
, example => <<"${payload}">>
})}
].
outputs() ->
[ binary()
, ref("builtin_output_republish")
, ref("builtin_output_console")
, ref("user_provided_function")
].
qos() ->
hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2), binary()]).
validate_sql(Sql) ->
case emqx_rule_sqlparser:parse(Sql) of
{ok, _Result} -> ok;

View File

@ -25,7 +25,9 @@
, load/1
, unload/0
, unload/1
, event_names/0
, event_name/1
, event_topic/1
, eventmsg_publish/1
]).
@ -45,17 +47,6 @@
, columns_with_exam/1
]).
-define(SUPPORTED_HOOK,
[ 'client.connected'
, 'client.disconnected'
, 'session.subscribed'
, 'session.unsubscribed'
, 'message.publish'
, 'message.delivered'
, 'message.acked'
, 'message.dropped'
]).
-ifdef(TEST).
-export([ reason/1
, hook_fun/1
@ -63,6 +54,17 @@
]).
-endif.
event_names() ->
[ 'client.connected'
, 'client.disconnected'
, 'session.subscribed'
, 'session.unsubscribed'
, 'message.publish'
, 'message.delivered'
, 'message.acked'
, 'message.dropped'
].
reload() ->
lists:foreach(fun(Rule) ->
ok = emqx_rule_engine:load_hooks_for_rule(Rule)
@ -78,7 +80,7 @@ load(Topic) ->
unload() ->
lists:foreach(fun(HookPoint) ->
emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)})
end, ?SUPPORTED_HOOK).
end, event_names()).
unload(Topic) ->
HookPoint = event_name(Topic),

View File

@ -247,9 +247,9 @@ handle_output(OutId, Selected, Envs) ->
})
end.
do_handle_output(ChannelId, Selected, _Envs) when is_binary(ChannelId) ->
?SLOG(debug, #{msg => "output to bridge", channel_id => ChannelId}),
emqx_bridge:send_message(ChannelId, Selected);
do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
?SLOG(debug, #{msg => "output to bridge", bridge_id => BridgeId}),
emqx_bridge:send_message(BridgeId, Selected);
do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
Mod:Func(Selected, Envs, Args).

View File

@ -36,34 +36,34 @@ t_crud_rule_api(_Config) ->
<<"outputs">> => [#{<<"function">> => <<"console">>}],
<<"sql">> => <<"SELECT * from \"t/1\"">>
},
{201, Rule} = emqx_rule_engine_api:crud_rules(post, #{body => Params0}),
{201, Rule} = emqx_rule_engine_api:'/rules'(post, #{body => Params0}),
%% if we post again with the same params, it return with 400 "rule id already exists"
?assertMatch({400, #{code := _, message := _Message}},
emqx_rule_engine_api:crud_rules(post, #{body => Params0})),
emqx_rule_engine_api:'/rules'(post, #{body => Params0})),
?assertEqual(RuleID, maps:get(id, Rule)),
{200, Rules} = emqx_rule_engine_api:crud_rules(get, #{}),
{200, Rules} = emqx_rule_engine_api:'/rules'(get, #{}),
ct:pal("RList : ~p", [Rules]),
?assert(length(Rules) > 0),
{200, Rule1} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}),
{200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}),
ct:pal("RShow : ~p", [Rule1]),
?assertEqual(Rule, Rule1),
{200, Rule2} = emqx_rule_engine_api:crud_rules_by_id(put, #{
{200, Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
bindings => #{id => RuleID},
body => Params0#{<<"sql">> => <<"select * from \"t/b\"">>}
}),
{200, Rule3} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}),
{200, Rule3} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}),
%ct:pal("RShow : ~p", [Rule3]),
?assertEqual(Rule3, Rule2),
?assertEqual(<<"select * from \"t/b\"">>, maps:get(sql, Rule3)),
?assertMatch({204}, emqx_rule_engine_api:crud_rules_by_id(delete,
?assertMatch({204}, emqx_rule_engine_api:'/rules/:id'(delete,
#{bindings => #{id => RuleID}})),
%ct:pal("Show After Deleted: ~p", [NotFound]),
?assertMatch({404, #{code := _, message := _Message}},
emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}})),
emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}})),
ok.