diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 6e274903e..5b2b62d82 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -17,220 +17,266 @@ -behaviour(minirest_api). --export([api_spec/0]). +-include_lib("typerefl/include/types.hrl"). --export([ list_create_bridges_in_cluster/2 - , list_local_bridges/1 - , crud_bridges_in_cluster/2 - , manage_bridges/2 +-import(hoconsc, [mk/2, array/1, enum/1]). + +%% Swagger specs from hocon schema +-export([api_spec/0, paths/0, schema/1, namespace/0]). + +%% API callbacks +-export(['/bridges'/2, '/bridges/:id'/2, + '/nodes/:node/bridges/:id/operation/:operation'/2]). + +-export([ list_local_bridges/1 , lookup_from_local_node/2 ]). -define(TYPES, [mqtt, http]). + +-define(CONN_TYPES, [mqtt]). + -define(TRY_PARSE_ID(ID, EXPR), try emqx_bridge:parse_bridge_id(Id) of {BridgeType, BridgeName} -> EXPR catch error:{invalid_bridge_id, Id0} -> {400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary, - ". Bridge ID must be of format 'bridge_type:name'">>}} + ". Bridge Ids must be of format {type}:{name}">>}} end). -define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{ matched => MATCH, success => SUCC, failed => FAILED, - speed => RATE, - speed_last5m => RATE_5, - speed_max => RATE_MAX + rate => RATE, + rate_last5m => RATE_5, + rate_max => RATE_MAX }). -define(metrics(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{ matched := MATCH, success := SUCC, failed := FAILED, - speed := RATE, - speed_last5m := RATE_5, - speed_max := RATE_MAX + rate := RATE, + rate_last5m := RATE_5, + rate_max := RATE_MAX }). -req_schema() -> - Schema = [ - case maps:to_list(emqx:get_raw_config([bridges, T], #{})) of - %% the bridge is not configured, so we have no method to get the schema - [] -> #{}; - [{_K, Conf} | _] -> - emqx_mgmt_api_configs:gen_schema(Conf) - end - || T <- ?TYPES], - #{'oneOf' => Schema}. - -node_schema() -> - #{type => string, example => "emqx@127.0.0.1"}. - -status_schema() -> - #{type => string, enum => [connected, disconnected]}. - -metrics_schema() -> - #{ type => object - , properties => #{ - matched => #{type => integer, example => "0"}, - success => #{type => integer, example => "0"}, - failed => #{type => integer, example => "0"}, - speed => #{type => number, format => float, example => "0.0"}, - speed_last5m => #{type => number, format => float, example => "0.0"}, - speed_max => #{type => number, format => float, example => "0.0"} - } - }. - -per_node_schema(Key, Schema) -> - #{ - type => array, - items => #{ - type => object, - properties => #{ - node => node_schema(), - Key => Schema - } - } - }. - -resp_schema() -> - AddMetadata = fun(Prop) -> - Prop#{status => status_schema(), - node_status => per_node_schema(status, status_schema()), - metrics => metrics_schema(), - node_metrics => per_node_schema(metrics, metrics_schema()), - id => #{type => string, example => "http:my_http_bridge"}, - bridge_type => #{type => string, enum => ?TYPES}, - node => node_schema() - } - end, - more_props_resp_schema(AddMetadata). - -more_props_resp_schema(AddMetadata) -> - #{'oneOf' := Schema} = req_schema(), - Schema1 = [S#{properties => AddMetadata(Prop)} - || S = #{properties := Prop} <- Schema], - #{'oneOf' => Schema1}. +namespace() -> "bridge". api_spec() -> - {bridge_apis(), []}. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). -bridge_apis() -> - [list_all_bridges_api(), crud_bridges_apis(), operation_apis()]. +paths() -> ["/bridges", "/bridges/:id", "/nodes/:node/bridges/:id/operation/:operation"]. -list_all_bridges_api() -> - ReqSchema = more_props_resp_schema(fun(Prop) -> - Prop#{id => #{type => string, required => true}} - end), - RespSchema = resp_schema(), - Metadata = #{ +error_schema(Code, Message) -> + [ {code, mk(string(), #{example => Code})} + , {message, mk(string(), #{example => Message})} + ]. + +get_response_body_schema() -> + emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(), + bridge_info_examples(get)). + +param_path_node() -> + path_param(node, binary(), atom_to_binary(node(), utf8)). + +param_path_operation() -> + path_param(operation, enum([start, stop, restart]), <<"start">>). + +param_path_id() -> + path_param(id, binary(), <<"http:my_http_bridge">>). + +path_param(Name, Type, Example) -> + {Name, mk(Type, + #{ in => path + , required => true + , example => Example + })}. + +bridge_info_array_example(Method) -> + [Config || #{value := Config} <- maps:values(bridge_info_examples(Method))]. + +bridge_info_examples(Method) -> + maps:merge(conn_bridge_examples(Method), #{ + <<"http_bridge">> => #{ + summary => <<"HTTP Bridge">>, + value => info_example(http, awesome, Method) + } + }). + +conn_bridge_examples(Method) -> + lists:foldl(fun(Type, Acc) -> + SType = atom_to_list(Type), + KeyIngress = bin(SType ++ "_ingress"), + KeyEgress = bin(SType ++ "_egress"), + maps:merge(Acc, #{ + KeyIngress => #{ + summary => bin(string:uppercase(SType) ++ " Ingress Bridge"), + value => info_example(Type, ingress, Method) + }, + KeyEgress => #{ + summary => bin(string:uppercase(SType) ++ " Egress Bridge"), + value => info_example(Type, egress, Method) + } + }) + end, #{}, ?CONN_TYPES). + +info_example(Type, Direction, Method) -> + maps:merge(info_example_basic(Type, Direction), + method_example(Type, Direction, Method)). + +method_example(Type, Direction, get) -> + SType = atom_to_list(Type), + SDir = atom_to_list(Direction), + SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge", + #{ + id => bin(SType ++ ":" ++ SName), + type => bin(SType), + name => bin(SName) + }; +method_example(Type, Direction, post) -> + SType = atom_to_list(Type), + SDir = atom_to_list(Direction), + SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge", + #{ + type => bin(SType), + name => bin(SName) + }; +method_example(_Type, _Direction, put) -> + #{}. + +info_example_basic(http, _) -> + #{ + url => <<"http://localhost:9901/messages/${topic}">>, + request_timeout => <<"30s">>, + connect_timeout => <<"30s">>, + max_retries => 3, + retry_interval => <<"10s">>, + pool_type => <<"random">>, + pool_size => 4, + enable_pipelining => true, + ssl => #{enable => false}, + from_local_topic => <<"emqx_http/#">>, + method => post, + body => <<"${payload}">> + }; +info_example_basic(mqtt, ingress) -> + #{ + connector => <<"mqtt:my_mqtt_connector">>, + direction => ingress, + from_remote_topic => <<"aws/#">>, + subscribe_qos => 1, + to_local_topic => <<"from_aws/${topic}">>, + payload => <<"${payload}">>, + qos => <<"${qos}">>, + retain => <<"${retain}">> + }; +info_example_basic(mqtt, egress) -> + #{ + connector => <<"mqtt:my_mqtt_connector">>, + direction => egress, + from_local_topic => <<"emqx/#">>, + to_remote_topic => <<"from_emqx/${topic}">>, + payload => <<"${payload}">>, + qos => 1, + retain => false + }. + +schema("/bridges") -> + #{ + operationId => '/bridges', get => #{ + tags => [<<"bridges">>], + summary => <<"List Bridges">>, description => <<"List all created bridges">>, responses => #{ - <<"200">> => emqx_mgmt_util:array_schema(resp_schema(), - <<"A list of the bridges">>) + 200 => emqx_dashboard_swagger:schema_with_example( + array(emqx_bridge_schema:get_response()), + bridge_info_array_example(get)) } }, post => #{ + tags => [<<"bridges">>], + summary => <<"Create Bridge">>, description => <<"Create a new bridge">>, - 'requestBody' => emqx_mgmt_util:schema(ReqSchema), + requestBody => emqx_dashboard_swagger:schema_with_examples( + emqx_bridge_schema:post_request(), + bridge_info_examples(post)), responses => #{ - <<"201">> => emqx_mgmt_util:schema(RespSchema, <<"Bridge created">>), - <<"400">> => emqx_mgmt_util:error_schema(<<"Create bridge failed">>, - ['UPDATE_FAILED']) + 201 => get_response_body_schema(), + 400 => error_schema('BAD_ARG', "Create bridge failed") } } - }, - {"/bridges/", Metadata, list_create_bridges_in_cluster}. + }; -crud_bridges_apis() -> - ReqSchema = req_schema(), - RespSchema = resp_schema(), - Metadata = #{ +schema("/bridges/:id") -> + #{ + operationId => '/bridges/:id', get => #{ + tags => [<<"bridges">>], + summary => <<"Get Bridge">>, description => <<"Get a bridge by Id">>, parameters => [param_path_id()], responses => #{ - <<"200">> => emqx_mgmt_util:array_schema(RespSchema, - <<"The details of the bridge">>), - <<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND']) + 200 => get_response_body_schema(), + 404 => error_schema('NOT_FOUND', "Bridge not found") } }, put => #{ + tags => [<<"bridges">>], + summary => <<"Update Bridge">>, description => <<"Update a bridge">>, parameters => [param_path_id()], - 'requestBody' => emqx_mgmt_util:schema(ReqSchema), + requestBody => emqx_dashboard_swagger:schema_with_examples( + emqx_bridge_schema:put_request(), + bridge_info_examples(put)), responses => #{ - <<"200">> => emqx_mgmt_util:array_schema(RespSchema, <<"Bridge updated">>), - <<"400">> => emqx_mgmt_util:error_schema(<<"Update bridge failed">>, - ['UPDATE_FAILED']) + 200 => get_response_body_schema(), + 400 => error_schema('BAD_ARG', "Update bridge failed") } }, delete => #{ + tags => [<<"bridges">>], + summary => <<"Delete Bridge">>, description => <<"Delete a bridge">>, parameters => [param_path_id()], responses => #{ - <<"204">> => emqx_mgmt_util:schema(<<"Bridge deleted">>), - <<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND']) + 204 => <<"Bridge deleted">> } } - }, - {"/bridges/:id", Metadata, crud_bridges_in_cluster}. + }; -operation_apis() -> - Metadata = #{ +schema("/nodes/:node/bridges/:id/operation/:operation") -> + #{ + operationId => '/nodes/:node/bridges/:id/operation/:operation', post => #{ + tags => [<<"bridges">>], + summary => <<"Start/Stop/Restart Bridge">>, description => <<"Start/Stop/Restart bridges on a specific node">>, parameters => [ param_path_node(), param_path_id(), - param_path_operation()], + param_path_operation() + ], responses => #{ - <<"500">> => emqx_mgmt_util:error_schema(<<"Operation Failed">>, - ['INTERNAL_ERROR']), - <<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}}, - {"/nodes/:node/bridges/:id/operation/:operation", Metadata, manage_bridges}. - -param_path_node() -> - #{ - name => node, - in => path, - schema => #{type => string}, - required => true, - example => node() + 500 => error_schema('INTERNAL_ERROR', "Operation Failed"), + 200 => <<"Operation success">> + } + } }. -param_path_id() -> - #{ - name => id, - in => path, - schema => #{type => string}, - required => true - }. - -param_path_operation()-> - #{ - name => operation, - in => path, - required => true, - schema => #{ - type => string, - enum => [start, stop, restart]}, - example => restart - }. - -list_create_bridges_in_cluster(post, #{body := #{<<"id">> := Id} = Conf}) -> - ?TRY_PARSE_ID(Id, - case emqx_bridge:lookup(BridgeType, BridgeName) of - {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}}; - {error, not_found} -> - case ensure_bridge(BridgeType, BridgeName, maps:remove(<<"id">>, Conf)) of - ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 201); - {error, Error} -> {400, Error} - end - end); -list_create_bridges_in_cluster(get, _Params) -> +'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) -> + BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()), + case emqx_bridge:lookup(BridgeType, BridgeName) of + {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}}; + {error, not_found} -> + case ensure_bridge_created(BridgeType, BridgeName, Conf) of + ok -> lookup_from_all_nodes(BridgeType, BridgeName, 201); + {error, Error} -> {400, Error} + end + end; +'/bridges'(get, _Params) -> {200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}. list_local_bridges(Node) when Node =:= node() -> @@ -238,22 +284,22 @@ list_local_bridges(Node) when Node =:= node() -> list_local_bridges(Node) -> rpc_call(Node, list_local_bridges, [Node]). -crud_bridges_in_cluster(get, #{bindings := #{id := Id}}) -> - ?TRY_PARSE_ID(Id, lookup_from_all_nodes(Id, BridgeType, BridgeName, 200)); +'/bridges/:id'(get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200)); -crud_bridges_in_cluster(put, #{bindings := #{id := Id}, body := Conf}) -> +'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf}) -> ?TRY_PARSE_ID(Id, case emqx_bridge:lookup(BridgeType, BridgeName) of {ok, _} -> - case ensure_bridge(BridgeType, BridgeName, Conf) of - ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 200); + case ensure_bridge_created(BridgeType, BridgeName, Conf) of + ok -> lookup_from_all_nodes(BridgeType, BridgeName, 200); {error, Error} -> {400, Error} end; {error, not_found} -> {404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}} end); -crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) -> +'/bridges/:id'(delete, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, case emqx_conf:remove(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], #{override_to => cluster}) of @@ -262,12 +308,12 @@ crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) -> {500, #{code => 102, message => emqx_resource_api:stringify(Reason)}} end). -lookup_from_all_nodes(Id, BridgeType, BridgeName, SuccCode) -> +lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of {ok, [{ok, _} | _] = Results} -> {SuccCode, format_bridge_info([R || {ok, R} <- Results])}; {ok, [{error, not_found} | _]} -> - {404, error_msg('NOT_FOUND', <<"not_found: ", Id/binary>>)}; + {404, error_msg('NOT_FOUND', <<"not_found">>)}; {error, ErrL} -> {500, error_msg('UNKNOWN_ERROR', ErrL)} end. @@ -278,7 +324,8 @@ lookup_from_local_node(BridgeType, BridgeName) -> Error -> Error end. -manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) -> +'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings := + #{node := Node, id := Id, operation := Op}}) -> OperFun = fun (<<"start">>) -> start; (<<"stop">>) -> stop; @@ -292,9 +339,10 @@ manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) {500, #{code => 102, message => emqx_resource_api:stringify(Reason)}} end). -ensure_bridge(BridgeType, BridgeName, Conf) -> - case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf, - #{override_to => cluster}) of +ensure_bridge_created(BridgeType, BridgeName, Conf) -> + Conf1 = maps:without([<<"type">>, <<"name">>], Conf), + case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], + Conf1, #{override_to => cluster}) of {ok, _} -> ok; {error, Reason} -> {error, error_msg('BAD_ARG', Reason)} @@ -346,12 +394,14 @@ aggregate_metrics(AllMetrics) -> end, InitMetrics, AllMetrics). format_resp(#{id := Id, raw_config := RawConf, - resource_data := #{mod := Mod, status := Status, metrics := Metrics}}) -> + resource_data := #{status := Status, metrics := Metrics}}) -> + {Type, Name} = emqx_bridge:parse_bridge_id(Id), IsConnected = fun(started) -> connected; (_) -> disconnected end, RawConf#{ id => Id, + type => Type, + name => Name, node => node(), - bridge_type => emqx_bridge:bridge_type(Mod), status => IsConnected(Status), metrics => Metrics }. @@ -378,4 +428,7 @@ rpc_call(Node, Mod, Fun, Args) -> error_msg(Code, Msg) when is_binary(Msg) -> #{code => Code, message => Msg}; error_msg(Code, Msg) -> - #{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}. + #{code => Code, message => bin(io_lib:format("~p", [Msg]))}. + +bin(S) when is_list(S) -> + list_to_binary(S). diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl new file mode 100644 index 000000000..2bef474bd --- /dev/null +++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl @@ -0,0 +1,95 @@ +-module(emqx_bridge_http_schema). + +-include_lib("typerefl/include/types.hrl"). + +-import(hoconsc, [mk/2, enum/1]). + +-export([roots/0, fields/1]). + +%%====================================================================================== +%% Hocon Schema Definitions +roots() -> []. + +fields("bridge") -> + basic_config() ++ + [ {url, mk(binary(), + #{ nullable => false + , desc =>""" +The URL of the HTTP Bridge.
+Template with variables is allowed in the path, but variables cannot be used in the scheme, host, +or port part.
+For example, http://localhost:9901/${topic} is allowed, but + http://${host}:9901/message or http://localhost:${port}/message +is not allowed. +""" + })} + , {from_local_topic, mk(binary(), + #{ desc =>""" +The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic +match the from_local_topic will be forwarded.
+NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches +from_local_topic will be forwarded. +""" + })} + , {method, mk(method(), + #{ default => post + , desc =>""" +The method of the HTTP request. All the available methods are: post, put, get, delete.
+Template with variables is allowed.
+""" + })} + , {headers, mk(map(), + #{ default => #{ + <<"accept">> => <<"application/json">>, + <<"cache-control">> => <<"no-cache">>, + <<"connection">> => <<"keep-alive">>, + <<"content-type">> => <<"application/json">>, + <<"keep-alive">> => <<"timeout=5">>} + , desc =>""" +The headers of the HTTP request.
+Template with variables is allowed. +""" + }) + } + , {body, mk(binary(), + #{ default => <<"${payload}">> + , desc =>""" +The body of the HTTP request.
+Template with variables is allowed. +""" + })} + , {request_timeout, mk(emqx_schema:duration_ms(), + #{ default => <<"30s">> + , desc =>""" +How long will the HTTP request timeout. +""" + })} + ]; + +fields("post") -> + [ type_field() + , name_field() + ] ++ fields("bridge"); + +fields("put") -> + fields("bridge"); + +fields("get") -> + [ id_field() + ] ++ fields("post"). + +basic_config() -> + proplists:delete(base_url, emqx_connector_http:fields(config)). + +%%====================================================================================== +id_field() -> + {id, mk(binary(), #{desc => "The Bridge Id", example => "http:my_http_bridge"})}. + +type_field() -> + {type, mk(http, #{desc => "The Bridge Type"})}. + +name_field() -> + {name, mk(binary(), #{desc => "The Bridge Name"})}. + +method() -> + enum([post, put, get, delete]). diff --git a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl new file mode 100644 index 000000000..d2cf6b1a8 --- /dev/null +++ b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl @@ -0,0 +1,62 @@ +-module(emqx_bridge_mqtt_schema). + +-include_lib("typerefl/include/types.hrl"). + +-import(hoconsc, [mk/2]). + +-export([roots/0, fields/1]). + +%%====================================================================================== +%% Hocon Schema Definitions +roots() -> []. + +fields("ingress") -> + [ direction(ingress, emqx_connector_mqtt_schema:ingress_desc()) + , emqx_bridge_schema:connector_name() + ] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress")); + +fields("egress") -> + [ direction(egress, emqx_connector_mqtt_schema:egress_desc()) + , emqx_bridge_schema:connector_name() + ] ++ emqx_connector_mqtt_schema:fields("egress"); + +fields("post_ingress") -> + [ type_field() + , name_field() + ] ++ fields("ingress"); +fields("post_egress") -> + [ type_field() + , name_field() + ] ++ fields("egress"); + +fields("put_ingress") -> + fields("ingress"); +fields("put_egress") -> + fields("egress"); + +fields("get_ingress") -> + [ id_field() + ] ++ fields("post_ingress"); +fields("get_egress") -> + [ id_field() + ] ++ fields("post_egress"). + +%%====================================================================================== +direction(Dir, Desc) -> + {direction, mk(Dir, + #{ nullable => false + , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
" + ++ Desc + })}. + +id_field() -> + {id, mk(binary(), #{desc => "The Bridge Id", example => "mqtt:my_mqtt_bridge"})}. + +type_field() -> + {type, mk(mqtt, #{desc => "The Bridge Type"})}. + +name_field() -> + {name, mk(binary(), + #{ desc => "The Bridge Name" + , example => "some_bridge_name" + })}. diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 3a3151ef0..ec875d0a4 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -2,123 +2,63 @@ -include_lib("typerefl/include/types.hrl"). +-import(hoconsc, [mk/2, ref/2]). + -export([roots/0, fields/1]). +-export([ get_response/0 + , put_request/0 + , post_request/0 + ]). + +-export([ connector_name/0 + ]). + %%====================================================================================== %% Hocon Schema Definitions -roots() -> [bridges]. +-define(CONN_TYPES, [mqtt]). -fields(bridges) -> - [ {mqtt, - sc(hoconsc:map(name, hoconsc:union([ ref("ingress_mqtt_bridge") - , ref("egress_mqtt_bridge") - ])), - #{ desc => "MQTT bridges" - })} - , {http, - sc(hoconsc:map(name, ref("http_bridge")), - #{ desc => "HTTP bridges" - })} - ]; - -fields("ingress_mqtt_bridge") -> - [ direction(ingress, emqx_connector_mqtt_schema:ingress_desc()) - , connector_name() - ] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress")); - -fields("egress_mqtt_bridge") -> - [ direction(egress, emqx_connector_mqtt_schema:egress_desc()) - , connector_name() - ] ++ emqx_connector_mqtt_schema:fields("egress"); - -fields("http_bridge") -> - basic_config_http() ++ - [ {url, - sc(binary(), - #{ nullable => false - , desc =>""" -The URL of the HTTP Bridge.
-Template with variables is allowed in the path, but variables cannot be used in the scheme, host, -or port part.
-For example, http://localhost:9901/${topic} is allowed, but - http://${host}:9901/message or http://localhost:${port}/message -is not allowed. -""" - })} - , {from_local_topic, - sc(binary(), - #{ desc =>""" -The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic -match the from_local_topic will be forwarded.
-NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches -from_local_topic will be forwarded. -""" - })} - , {method, - sc(method(), - #{ default => post - , desc =>""" -The method of the HTTP request. All the available methods are: post, put, get, delete.
-Template with variables is allowed.
-""" - })} - , {headers, - sc(map(), - #{ default => #{ - <<"accept">> => <<"application/json">>, - <<"cache-control">> => <<"no-cache">>, - <<"connection">> => <<"keep-alive">>, - <<"content-type">> => <<"application/json">>, - <<"keep-alive">> => <<"timeout=5">>} - , desc =>""" -The headers of the HTTP request.
-Template with variables is allowed. -""" - }) - } - , {body, - sc(binary(), - #{ default => <<"${payload}">> - , desc =>""" -The body of the HTTP request.
-Template with variables is allowed. -""" - })} - , {request_timeout, - sc(emqx_schema:duration_ms(), - #{ default => <<"30s">> - , desc =>""" -How long will the HTTP request timeout. -""" - })} - ]. - -direction(Dir, Desc) -> - {direction, - sc(Dir, - #{ nullable => false - , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
" ++ - Desc - })}. +%%====================================================================================== +%% For HTTP APIs +get_response() -> + http_schema("get"). connector_name() -> {connector, - sc(binary(), + mk(binary(), #{ nullable => false , desc =>""" The connector name to be used for this bridge. -Connectors are configured as 'connectors.type.name', +Connectors are configured as 'connectors.{type}.{name}', for example 'connectors.http.mybridge'. """ })}. -basic_config_http() -> - proplists:delete(base_url, emqx_connector_http:fields(config)). +put_request() -> + http_schema("put"). -method() -> - hoconsc:enum([post, put, get, delete]). +post_request() -> + http_schema("post"). -sc(Type, Meta) -> hoconsc:mk(Type, Meta). +http_schema(Method) -> + Schemas = lists:flatmap(fun(Type) -> + [ref(schema_mod(Type), Method ++ "_ingress"), + ref(schema_mod(Type), Method ++ "_egress")] + end, ?CONN_TYPES), + hoconsc:union([ref(emqx_bridge_http_schema, Method) + | Schemas]). -ref(Field) -> hoconsc:ref(?MODULE, Field). +%%====================================================================================== +%% For config files +roots() -> [bridges]. + +fields(bridges) -> + [{http, mk(hoconsc:map(name, ref(emqx_bridge_http_schema, "bridge")), #{})}] + ++ [{T, mk(hoconsc:map(name, hoconsc:union([ + ref(schema_mod(T), "ingress"), + ref(schema_mod(T), "egress") + ])), #{})} || T <- ?CONN_TYPES]. + +schema_mod(Type) -> + list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 23d4691f5..52c8a32de 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -21,7 +21,9 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -define(CONF_DEFAULT, <<"bridges: {}">>). --define(TEST_ID, <<"http:test_bridge">>). +-define(BRIDGE_TYPE, <<"http">>). +-define(BRIDGE_NAME, <<"test_bridge">>). +-define(BRIDGE_ID, <<"http:test_bridge">>). -define(URL(PORT, PATH), list_to_binary( io_lib:format("http://localhost:~s/~s", [integer_to_list(PORT), PATH]))). @@ -134,11 +136,15 @@ t_http_crud_apis(_) -> %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}), + ?HTTP_BRIDGE(URL1)#{ + <<"type">> => ?BRIDGE_TYPE, + <<"name">> => ?BRIDGE_NAME + }), %ct:pal("---bridge: ~p", [Bridge]), - ?assertMatch(#{ <<"id">> := ?TEST_ID - , <<"bridge_type">> := <<"http">> + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + , <<"type">> := ?BRIDGE_TYPE + , <<"name">> := ?BRIDGE_NAME , <<"status">> := _ , <<"node_status">> := [_|_] , <<"metrics">> := _ @@ -148,7 +154,10 @@ t_http_crud_apis(_) -> %% create a again returns an error {ok, 400, RetMsg} = request(post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}), + ?HTTP_BRIDGE(URL1)#{ + <<"type">> => ?BRIDGE_TYPE, + <<"name">> => ?BRIDGE_NAME + }), ?assertMatch( #{ <<"code">> := _ , <<"message">> := <<"bridge already exists">> @@ -156,10 +165,11 @@ t_http_crud_apis(_) -> %% update the request-path of the bridge URL2 = ?URL(Port, "path2"), - {ok, 200, Bridge2} = request(put, uri(["bridges", ?TEST_ID]), + {ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]), ?HTTP_BRIDGE(URL2)), - ?assertMatch(#{ <<"id">> := ?TEST_ID - , <<"bridge_type">> := <<"http">> + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + , <<"type">> := ?BRIDGE_TYPE + , <<"name">> := ?BRIDGE_NAME , <<"status">> := _ , <<"node_status">> := [_|_] , <<"metrics">> := _ @@ -169,8 +179,9 @@ t_http_crud_apis(_) -> %% list all bridges again, assert Bridge2 is in it {ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []), - ?assertMatch([#{ <<"id">> := ?TEST_ID - , <<"bridge_type">> := <<"http">> + ?assertMatch([#{ <<"id">> := ?BRIDGE_ID + , <<"type">> := ?BRIDGE_TYPE + , <<"name">> := ?BRIDGE_NAME , <<"status">> := _ , <<"node_status">> := [_|_] , <<"metrics">> := _ @@ -179,9 +190,10 @@ t_http_crud_apis(_) -> }], jsx:decode(Bridge2Str)), %% get the bridge by id - {ok, 200, Bridge3Str} = request(get, uri(["bridges", ?TEST_ID]), []), - ?assertMatch(#{ <<"id">> := ?TEST_ID - , <<"bridge_type">> := <<"http">> + {ok, 200, Bridge3Str} = request(get, uri(["bridges", ?BRIDGE_ID]), []), + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + , <<"type">> := ?BRIDGE_TYPE + , <<"name">> := ?BRIDGE_NAME , <<"status">> := _ , <<"node_status">> := [_|_] , <<"metrics">> := _ @@ -190,11 +202,11 @@ t_http_crud_apis(_) -> }, jsx:decode(Bridge3Str)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?TEST_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% update a deleted bridge returns an error - {ok, 404, ErrMsg2} = request(put, uri(["bridges", ?TEST_ID]), + {ok, 404, ErrMsg2} = request(put, uri(["bridges", ?BRIDGE_ID]), ?HTTP_BRIDGE(URL2)), ?assertMatch( #{ <<"code">> := _ @@ -206,11 +218,15 @@ t_start_stop_bridges(_) -> Port = start_http_server(fun handle_fun_200_ok/1), URL1 = ?URL(Port, "abc"), {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}), + ?HTTP_BRIDGE(URL1)#{ + <<"type">> => ?BRIDGE_TYPE, + <<"name">> => ?BRIDGE_NAME + }), %ct:pal("the bridge ==== ~p", [Bridge]), ?assertMatch( - #{ <<"id">> := ?TEST_ID - , <<"bridge_type">> := <<"http">> + #{ <<"id">> := ?BRIDGE_ID + , <<"type">> := ?BRIDGE_TYPE + , <<"name">> := ?BRIDGE_NAME , <<"status">> := _ , <<"node_status">> := [_|_] , <<"metrics">> := _ @@ -219,42 +235,42 @@ t_start_stop_bridges(_) -> }, jsx:decode(Bridge)), %% stop it {ok, 200, <<>>} = request(post, - uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "stop"]), + uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "stop"]), <<"">>), - {ok, 200, Bridge2} = request(get, uri(["bridges", ?TEST_ID]), []), - ?assertMatch(#{ <<"id">> := ?TEST_ID + {ok, 200, Bridge2} = request(get, uri(["bridges", ?BRIDGE_ID]), []), + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID , <<"status">> := <<"disconnected">> }, jsx:decode(Bridge2)), %% start again {ok, 200, <<>>} = request(post, - uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "start"]), + uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "start"]), <<"">>), - {ok, 200, Bridge3} = request(get, uri(["bridges", ?TEST_ID]), []), - ?assertMatch(#{ <<"id">> := ?TEST_ID + {ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []), + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID , <<"status">> := <<"connected">> }, jsx:decode(Bridge3)), %% restart an already started bridge {ok, 200, <<>>} = request(post, - uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "restart"]), + uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "restart"]), <<"">>), - {ok, 200, Bridge3} = request(get, uri(["bridges", ?TEST_ID]), []), - ?assertMatch(#{ <<"id">> := ?TEST_ID + {ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []), + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID , <<"status">> := <<"connected">> }, jsx:decode(Bridge3)), %% stop it again {ok, 200, <<>>} = request(post, - uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "stop"]), + uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "stop"]), <<"">>), %% restart a stopped bridge {ok, 200, <<>>} = request(post, - uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "restart"]), + uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "restart"]), <<"">>), - {ok, 200, Bridge4} = request(get, uri(["bridges", ?TEST_ID]), []), - ?assertMatch(#{ <<"id">> := ?TEST_ID + {ok, 200, Bridge4} = request(get, uri(["bridges", ?BRIDGE_ID]), []), + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID , <<"status">> := <<"connected">> }, jsx:decode(Bridge4)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?TEST_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). %%-------------------------------------------------------------------- diff --git a/apps/emqx_connector/etc/emqx_connector.conf b/apps/emqx_connector/etc/emqx_connector.conf index 06395ac94..8929598be 100644 --- a/apps/emqx_connector/etc/emqx_connector.conf +++ b/apps/emqx_connector/etc/emqx_connector.conf @@ -1,4 +1,5 @@ #connectors.mqtt.my_mqtt_connector { +# mode = cluster_shareload # server = "127.0.0.1:1883" # proto_ver = "v4" # username = "username1" @@ -8,7 +9,6 @@ # retry_interval = "30s" # max_inflight = 32 # reconnect_interval = "30s" -# bridge_mode = true # replayq { # dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/" # seg_bytes = "100MB" diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 7e934b997..95bc33a83 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -30,6 +30,8 @@ %% API callbacks -export(['/connectors_test'/2, '/connectors'/2, '/connectors/:id'/2]). +-define(CONN_TYPES, [mqtt]). + -define(TRY_PARSE_ID(ID, EXPR), try emqx_connector:parse_connector_id(Id) of {ConnType, ConnName} -> @@ -38,7 +40,7 @@ catch error:{invalid_bridge_id, Id0} -> {400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary, - ". Bridge ID must be of format 'bridge_type:name'">>}} + ". Bridge Ids must be of format {type}:{name}">>}} end). namespace() -> "connector". @@ -53,17 +55,71 @@ error_schema(Code, Message) -> , {message, mk(string(), #{example => Message})} ]. -connector_info() -> - hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_info") - ]). +put_request_body_schema() -> + emqx_dashboard_swagger:schema_with_examples( + emqx_connector_schema:put_request(), connector_info_examples(put)). -connector_test_info() -> - hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_test_info") - ]). +post_request_body_schema() -> + emqx_dashboard_swagger:schema_with_examples( + emqx_connector_schema:post_request(), connector_info_examples(post)). -connector_req() -> - hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector") - ]). +get_response_body_schema() -> + emqx_dashboard_swagger:schema_with_examples( + emqx_connector_schema:get_response(), connector_info_examples(get)). + +connector_info_array_example(Method) -> + [Config || #{value := Config} <- maps:values(connector_info_examples(Method))]. + +connector_info_examples(Method) -> + lists:foldl(fun(Type, Acc) -> + SType = atom_to_list(Type), + maps:merge(Acc, #{ + Type => #{ + summary => bin(string:uppercase(SType) ++ " Connector"), + value => info_example(Type, Method) + } + }) + end, #{}, ?CONN_TYPES). + +info_example(Type, Method) -> + maps:merge(info_example_basic(Type), + method_example(Type, Method)). + +method_example(Type, get) -> + SType = atom_to_list(Type), + SName = "my_" ++ SType ++ "_connector", + #{ + id => bin(SType ++ ":" ++ SName), + type => bin(SType), + name => bin(SName) + }; +method_example(Type, post) -> + SType = atom_to_list(Type), + SName = "my_" ++ SType ++ "_connector", + #{ + type => bin(SType), + name => bin(SName) + }; +method_example(_Type, put) -> + #{}. + +info_example_basic(mqtt) -> + #{ + mode => cluster_shareload, + server => <<"127.0.0.1:1883">>, + reconnect_interval => <<"30s">>, + proto_ver => <<"v4">>, + username => <<"foo">>, + password => <<"bar">>, + clientid => <<"foo">>, + clean_start => true, + keepalive => <<"300s">>, + retry_interval => <<"30s">>, + max_inflight => 100, + ssl => #{ + enable => false + } + }. param_path_id() -> [{id, mk(binary(), #{in => path, example => <<"mqtt:my_mqtt_connector">>})}]. @@ -74,9 +130,9 @@ schema("/connectors_test") -> post => #{ tags => [<<"connectors">>], description => <<"Test creating a new connector by given Id
" - "The ID must be of format 'type:name'">>, + "The ID must be of format '{type}:{name}'">>, summary => <<"Test creating connector">>, - requestBody => connector_test_info(), + requestBody => post_request_body_schema(), responses => #{ 200 => <<"Test connector OK">>, 400 => error_schema('TEST_FAILED', "connector test failed") @@ -92,17 +148,19 @@ schema("/connectors") -> description => <<"List all connectors">>, summary => <<"List connectors">>, responses => #{ - 200 => mk(array(connector_info()), #{desc => "List of connectors"}) + 200 => emqx_dashboard_swagger:schema_with_example( + array(emqx_connector_schema:get_response()), + connector_info_array_example(get)) } }, post => #{ tags => [<<"connectors">>], description => <<"Create a new connector by given Id
" - "The ID must be of format 'type:name'">>, + "The ID must be of format '{type}:{name}'">>, summary => <<"Create connector">>, - requestBody => connector_info(), + requestBody => post_request_body_schema(), responses => #{ - 201 => connector_info(), + 201 => get_response_body_schema(), 400 => error_schema('ALREADY_EXISTS', "connector already exists") } } @@ -117,7 +175,7 @@ schema("/connectors/:id") -> summary => <<"Get connector">>, parameters => param_path_id(), responses => #{ - 200 => connector_info(), + 200 => get_response_body_schema(), 404 => error_schema('NOT_FOUND', "Connector not found") } }, @@ -126,9 +184,9 @@ schema("/connectors/:id") -> description => <<"Update an existing connector by Id">>, summary => <<"Update connector">>, parameters => param_path_id(), - requestBody => connector_req(), + requestBody => put_request_body_schema(), responses => #{ - 200 => <<"Update connector successfully">>, + 200 => get_response_body_schema(), 400 => error_schema('UPDATE_FAIL', "Update failed"), 404 => error_schema('NOT_FOUND', "Connector not found") }}, @@ -143,8 +201,8 @@ schema("/connectors/:id") -> }} }. -'/connectors_test'(post, #{body := #{<<"bridge_type">> := ConnType} = Params}) -> - case emqx_connector:create_dry_run(ConnType, maps:remove(<<"bridge_type">>, Params)) of +'/connectors_test'(post, #{body := #{<<"type">> := ConnType} = Params}) -> + case emqx_connector:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of ok -> {200}; {error, Error} -> {400, error_msg('BAD_ARG', Error)} @@ -153,17 +211,20 @@ schema("/connectors/:id") -> '/connectors'(get, _Request) -> {200, emqx_connector:list()}; -'/connectors'(post, #{body := #{<<"id">> := Id} = Params}) -> - ?TRY_PARSE_ID(Id, - case emqx_connector:lookup(ConnType, ConnName) of - {ok, _} -> - {400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)}; - {error, not_found} -> - case emqx_connector:update(ConnType, ConnName, maps:remove(<<"id">>, Params)) of - {ok, #{raw_config := RawConf}} -> {201, RawConf#{<<"id">> => Id}}; - {error, Error} -> {400, error_msg('BAD_ARG', Error)} - end - end). +'/connectors'(post, #{body := #{<<"type">> := ConnType} = Params}) -> + ConnName = maps:get(<<"name">>, Params, emqx_misc:gen_id()), + case emqx_connector:lookup(ConnType, ConnName) of + {ok, _} -> + {400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)}; + {error, not_found} -> + case emqx_connector:update(ConnType, ConnName, + maps:without([<<"type">>, <<"name">>], Params)) of + {ok, #{raw_config := RawConf}} -> + {201, RawConf#{<<"id">> => + emqx_connector:connector_id(ConnType, ConnName)}}; + {error, Error} -> {400, error_msg('BAD_ARG', Error)} + end + end. '/connectors/:id'(get, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, @@ -200,4 +261,7 @@ schema("/connectors/:id") -> error_msg(Code, Msg) when is_binary(Msg) -> #{code => Code, message => Msg}; error_msg(Code, Msg) -> - #{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}. + #{code => Code, message => bin(io_lib:format("~p", [Msg]))}. + +bin(S) when is_list(S) -> + list_to_binary(S). diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 1acd8b298..6bc609fa8 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -40,6 +40,8 @@ -behaviour(hocon_schema). +-import(hoconsc, [mk/2]). + -export([ roots/0 , fields/1]). @@ -49,7 +51,25 @@ roots() -> fields("config"). fields("config") -> - emqx_connector_mqtt_schema:fields("config"). + emqx_connector_mqtt_schema:fields("config"); + +fields("get") -> + [{id, mk(binary(), + #{ desc => "The connector Id" + , example => <<"mqtt:my_mqtt_connector">> + })}] + ++ fields("post"); + +fields("put") -> + emqx_connector_mqtt_schema:fields("connector"); + +fields("post") -> + [ {type, mk(mqtt, #{desc => "The Connector Type"})} + , {name, mk(binary(), + #{ desc => "The Connector Name" + , example => <<"my_mqtt_connector">> + })} + ] ++ fields("put"). %% =================================================================== %% supervisor APIs @@ -100,7 +120,7 @@ on_start(InstId, Conf) -> BasicConf = basic_config(Conf), BridgeConf = BasicConf#{ name => InstanceId, - clientid => clientid(InstanceId), + clientid => clientid(maps:get(clientid, Conf, InstId)), subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)), forwards => make_forward_confs(maps:get(egress, Conf, undefined)) }, @@ -162,7 +182,6 @@ basic_config(#{ server := Server, reconnect_interval := ReconnIntv, proto_ver := ProtoVer, - bridge_mode := BridgeMod, username := User, password := Password, clean_start := CleanStart, @@ -177,7 +196,7 @@ basic_config(#{ server => Server, reconnect_interval => ReconnIntv, proto_ver => ProtoVer, - bridge_mode => BridgeMod, + bridge_mode => true, username => User, password => Password, clean_start => CleanStart, @@ -190,4 +209,4 @@ basic_config(#{ }. clientid(Id) -> - list_to_binary(lists:concat([Id, ":", node()])). + iolist_to_binary([Id, ":", atom_to_list(node())]). diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl index 3caf2b595..c386a829f 100644 --- a/apps/emqx_connector/src/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/emqx_connector_schema.erl @@ -4,8 +4,33 @@ -include_lib("typerefl/include/types.hrl"). +-import(hoconsc, [mk/2, ref/2]). + -export([roots/0, fields/1]). +-export([ get_response/0 + , put_request/0 + , post_request/0 + ]). + +-define(CONN_TYPES, [mqtt]). + +%%====================================================================================== +%% For HTTP APIs + +get_response() -> + http_schema("get"). + +put_request() -> + http_schema("put"). + +post_request() -> + http_schema("post"). + +http_schema(Method) -> + Schemas = [ref(schema_mod(Type), Method) || Type <- ?CONN_TYPES], + hoconsc:union(Schemas). + %%====================================================================================== %% Hocon Schema Definitions @@ -14,24 +39,12 @@ roots() -> ["connectors"]. fields(connectors) -> fields("connectors"); fields("connectors") -> [ {mqtt, - sc(hoconsc:map(name, - hoconsc:union([ ref("mqtt_connector") + mk(hoconsc:map(name, + hoconsc:union([ ref(emqx_connector_mqtt_schema, "connector") ])), #{ desc => "MQTT bridges" })} - ]; + ]. -fields("mqtt_connector") -> - emqx_connector_mqtt_schema:fields("connector"); - -fields("mqtt_connector_info") -> - [{id, sc(binary(), #{desc => "The connector Id"})}] - ++ fields("mqtt_connector"); - -fields("mqtt_connector_test_info") -> - [{bridge_type, sc(mqtt, #{desc => "The Bridge Type"})}] - ++ fields("mqtt_connector"). - -sc(Type, Meta) -> hoconsc:mk(Type, Meta). - -ref(Field) -> hoconsc:ref(?MODULE, Field). +schema_mod(Type) -> + list_to_atom(lists:concat(["emqx_connector_", Type])). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 6436a4c96..2338129d1 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -8,7 +8,7 @@ %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, +%% cluster_shareload under the License is cluster_shareload on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. @@ -38,7 +38,24 @@ fields("config") -> topic_mappings(); fields("connector") -> - [ {server, + [ {mode, + sc(hoconsc:enum([cluster_singleton, cluster_shareload]), + #{ default => cluster_shareload + , desc => """ +The mode of the MQTT Bridge. Can be one of 'cluster_singleton' or 'cluster_shareload'
+ +- cluster_singleton: create an unique MQTT connection within the emqx cluster.
+In 'cluster_singleton' node, all messages toward the remote broker go through the same +MQTT connection.
+- cluster_shareload: create an MQTT connection on each node in the emqx cluster.
+In 'cluster_shareload' mode, the incomming load from the remote broker is shared by +using shared subscription.
+Note that the 'clientid' is suffixed by the node name, this is to avoid +clientid conflicts between different nodes. And we can only use shared subscription +topic filters for 'from_remote_topic'. +""" + })} + , {server, sc(emqx_schema:ip_port(), #{ default => "127.0.0.1:1883" , desc => "The host and port of the remote MQTT broker" @@ -49,11 +66,6 @@ fields("connector") -> #{ default => v4 , desc => "The MQTT protocol version" })} - , {bridge_mode, - sc(boolean(), - #{ default => true - , desc => "The bridge mode of the MQTT protocol" - })} , {username, sc(binary(), #{ default => "emqx" @@ -66,8 +78,7 @@ fields("connector") -> })} , {clientid, sc(binary(), - #{ default => "emqx_${nodename}" - , desc => "The clientid of the MQTT protocol" + #{ desc => "The clientid of the MQTT protocol" })} , {clean_start, sc(boolean(), diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 96f530563..bbac76674 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -24,7 +24,11 @@ -define(CONF_DEFAULT, <<"connectors: {}">>). -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). +-define(CONNECTR_TYPE, <<"mqtt">>). +-define(CONNECTR_NAME, <<"test_connector">>). -define(CONNECTR_ID, <<"mqtt:test_connector">>). +-define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>). +-define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>). -define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>). -define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>). -define(MQTT_CONNECOTR(Username), @@ -63,8 +67,8 @@ -define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX), #{<<"matched">> := MATCH, <<"success">> := SUCC, - <<"failed">> := FAILED, <<"speed">> := SPEED, - <<"speed_last5m">> := SPEED5M, <<"speed_max">> := SPEEDMAX}). + <<"failed">> := FAILED, <<"rate">> := SPEED, + <<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}). all() -> emqx_common_test_helpers:all(?MODULE). @@ -115,7 +119,9 @@ t_mqtt_crud_apis(_) -> %% POST /connectors/ will create a connector User1 = <<"user1">>, {ok, 201, Connector} = request(post, uri(["connectors"]), - ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}), + ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), %ct:pal("---connector: ~p", [Connector]), ?assertMatch(#{ <<"id">> := ?CONNECTR_ID @@ -128,7 +134,9 @@ t_mqtt_crud_apis(_) -> %% create a again returns an error {ok, 400, RetMsg} = request(post, uri(["connectors"]), - ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}), + ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), ?assertMatch( #{ <<"code">> := _ , <<"message">> := <<"connector already exists">> @@ -187,7 +195,9 @@ t_mqtt_conn_bridge_ingress(_) -> %% then we add a mqtt connector, using POST User1 = <<"user1">>, {ok, 201, Connector} = request(post, uri(["connectors"]), - ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}), + ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), %ct:pal("---connector: ~p", [Connector]), ?assertMatch(#{ <<"id">> := ?CONNECTR_ID @@ -201,11 +211,14 @@ t_mqtt_conn_bridge_ingress(_) -> %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_INGRESS}), + ?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_INGRESS + }), %ct:pal("---bridge: ~p", [Bridge]), ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS - , <<"bridge_type">> := <<"mqtt">> + , <<"type">> := <<"mqtt">> , <<"status">> := <<"connected">> , <<"connector">> := ?CONNECTR_ID }, jsx:decode(Bridge)), @@ -250,7 +263,9 @@ t_mqtt_conn_bridge_egress(_) -> %% then we add a mqtt connector, using POST User1 = <<"user1">>, {ok, 201, Connector} = request(post, uri(["connectors"]), - ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}), + ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), %ct:pal("---connector: ~p", [Connector]), ?assertMatch(#{ <<"id">> := ?CONNECTR_ID @@ -264,11 +279,15 @@ t_mqtt_conn_bridge_egress(_) -> %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}), + ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), %ct:pal("---bridge: ~p", [Bridge]), ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS - , <<"bridge_type">> := <<"mqtt">> + , <<"type">> := ?CONNECTR_TYPE + , <<"name">> := ?BRIDGE_NAME_EGRESS , <<"status">> := <<"connected">> , <<"connector">> := ?CONNECTR_ID }, jsx:decode(Bridge)), @@ -322,7 +341,10 @@ t_mqtt_conn_update(_) -> %% then we add a mqtt connector, using POST {ok, 201, Connector} = request(post, uri(["connectors"]), - ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"id">> => ?CONNECTR_ID}), + ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>) + #{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), %ct:pal("---connector: ~p", [Connector]), ?assertMatch(#{ <<"id">> := ?CONNECTR_ID @@ -332,9 +354,13 @@ t_mqtt_conn_update(_) -> %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}), + ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS - , <<"bridge_type">> := <<"mqtt">> + , <<"type">> := <<"mqtt">> + , <<"name">> := ?BRIDGE_NAME_EGRESS , <<"status">> := <<"connected">> , <<"connector">> := ?CONNECTR_ID }, jsx:decode(Bridge)), @@ -358,9 +384,15 @@ t_mqtt_conn_testing(_) -> %% APIs for testing the connectivity %% then we add a mqtt connector, using POST {ok, 200, <<>>} = request(post, uri(["connectors_test"]), - ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"bridge_type">> => <<"mqtt">>}), + ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), {ok, 400, _} = request(post, uri(["connectors_test"]), - ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)#{<<"bridge_type">> => <<"mqtt">>}). + ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }). %%-------------------------------------------------------------------- %% HTTP Request diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl index 824890efc..d48b10dd1 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl @@ -27,7 +27,7 @@ -export([ inc/3 , inc/4 , get/3 - , get_speed/2 + , get_rate/2 , create_metrics/2 , clear_metrics/2 ]). @@ -54,7 +54,7 @@ -define(SECS_5M, 300). -define(SAMPLING, 10). -else. -%% Use 5 secs average speed instead of 5 mins in case of testing +%% Use 5 secs average rate instead of 5 mins in case of testing -define(SECS_5M, 5). -define(SAMPLING, 1). -endif. @@ -65,9 +65,9 @@ matched => integer(), success => integer(), failed => integer(), - speed => float(), - speed_max => float(), - speed_last5m => float() + rate => float(), + rate_max => float(), + rate_last5m => float() }. -type handler_name() :: atom(). -type metric_id() :: binary(). @@ -75,22 +75,22 @@ -define(CntrRef(Name), {?MODULE, Name}). -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)). -%% the speed of 'matched' --record(speed, { +%% the rate of 'matched' +-record(rate, { max = 0 :: number(), current = 0 :: number(), last5m = 0 :: number(), - %% metadata for calculating the avg speed + %% metadata for calculating the avg rate tick = 1 :: number(), last_v = 0 :: number(), - %% metadata for calculating the 5min avg speed + %% metadata for calculating the 5min avg rate last5m_acc = 0 :: number(), last5m_smpl = [] :: list() }). -record(state, { metric_ids = sets:new(), - speeds :: undefined | #{metric_id() => #speed{}} + rates :: undefined | #{metric_id() => #rate{}} }). %%------------------------------------------------------------------------------ @@ -122,19 +122,19 @@ get(Name, Id, Metric) -> Ref -> counters:get(Ref, metrics_idx(Metric)) end. --spec(get_speed(handler_name(), metric_id()) -> map()). -get_speed(Name, Id) -> - gen_server:call(Name, {get_speed, Id}). +-spec(get_rate(handler_name(), metric_id()) -> map()). +get_rate(Name, Id) -> + gen_server:call(Name, {get_rate, Id}). -spec(get_metrics(handler_name(), metric_id()) -> metrics()). get_metrics(Name, Id) -> - #{max := Max, current := Current, last5m := Last5M} = get_speed(Name, Id), + #{max := Max, current := Current, last5m := Last5M} = get_rate(Name, Id), #{matched => get_matched(Name, Id), success => get_success(Name, Id), failed => get_failed(Name, Id), - speed => Current, - speed_max => Max, - speed_last5m => Last5M + rate => Current, + rate_max => Max, + rate_last5m => Last5M }. -spec inc(handler_name(), metric_id(), atom()) -> ok. @@ -176,35 +176,35 @@ start_link(Name) -> init(Name) -> erlang:process_flag(trap_exit, true), - %% the speed metrics + %% the rate metrics erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), persistent_term:put(?CntrRef(Name), #{}), {ok, #state{}}. -handle_call({get_speed, _Id}, _From, State = #state{speeds = undefined}) -> - {reply, format_speed(#speed{}), State}; -handle_call({get_speed, Id}, _From, State = #state{speeds = Speeds}) -> - {reply, case maps:get(Id, Speeds, undefined) of - undefined -> format_speed(#speed{}); - Speed -> format_speed(Speed) +handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) -> + {reply, format_rate(#rate{}), State}; +handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) -> + {reply, case maps:get(Id, Rates, undefined) of + undefined -> format_rate(#rate{}); + Rate -> format_rate(Rate) end, State}; handle_call({create_metrics, Id}, _From, - State = #state{metric_ids = MIDs, speeds = Speeds}) -> + State = #state{metric_ids = MIDs, rates = Rates}) -> {reply, create_counters(get_self_name(), Id), State#state{metric_ids = sets:add_element(Id, MIDs), - speeds = case Speeds of - undefined -> #{Id => #speed{}}; - _ -> Speeds#{Id => #speed{}} + rates = case Rates of + undefined -> #{Id => #rate{}}; + _ -> Rates#{Id => #rate{}} end}}; handle_call({delete_metrics, Id}, _From, - State = #state{metric_ids = MIDs, speeds = Speeds}) -> + State = #state{metric_ids = MIDs, rates = Rates}) -> {reply, delete_counters(get_self_name(), Id), State#state{metric_ids = sets:del_element(Id, MIDs), - speeds = case Speeds of + rates = case Rates of undefined -> undefined; - _ -> maps:remove(Id, Speeds) + _ -> maps:remove(Id, Rates) end}}; handle_call(_Request, _From, State) -> @@ -213,17 +213,17 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info(ticking, State = #state{speeds = undefined}) -> +handle_info(ticking, State = #state{rates = undefined}) -> erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State}; -handle_info(ticking, State = #state{speeds = Speeds0}) -> - Speeds = maps:map( - fun(Id, Speed) -> - calculate_speed(get_matched(get_self_name(), Id), Speed) - end, Speeds0), +handle_info(ticking, State = #state{rates = Rates0}) -> + Rates = maps:map( + fun(Id, Rate) -> + calculate_rate(get_matched(get_self_name(), Id), Rate) + end, Rates0), erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), - {noreply, State#state{speeds = Speeds}}; + {noreply, State#state{rates = Rates}}; handle_info(_Info, State) -> {noreply, State}. @@ -261,38 +261,38 @@ get_couters_ref(Name, Id) -> get_all_counters(Name) -> persistent_term:get(?CntrRef(Name), #{}). -calculate_speed(_CurrVal, undefined) -> +calculate_rate(_CurrVal, undefined) -> undefined; -calculate_speed(CurrVal, #speed{max = MaxSpeed0, last_v = LastVal, - tick = Tick, last5m_acc = AccSpeed5Min0, +calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal, + tick = Tick, last5m_acc = AccRate5Min0, last5m_smpl = Last5MinSamples0}) -> - %% calculate the current speed based on the last value of the counter - CurrSpeed = (CurrVal - LastVal) / ?SAMPLING, + %% calculate the current rate based on the last value of the counter + CurrRate = (CurrVal - LastVal) / ?SAMPLING, - %% calculate the max speed since the emqx startup - MaxSpeed = - if MaxSpeed0 >= CurrSpeed -> MaxSpeed0; - true -> CurrSpeed + %% calculate the max rate since the emqx startup + MaxRate = + if MaxRate0 >= CurrRate -> MaxRate0; + true -> CurrRate end, - %% calculate the average speed in last 5 mins + %% calculate the average rate in last 5 mins {Last5MinSamples, Acc5Min, Last5Min} = if Tick =< ?SAMPCOUNT_5M -> - Acc = AccSpeed5Min0 + CurrSpeed, - {lists:reverse([CurrSpeed | lists:reverse(Last5MinSamples0)]), + Acc = AccRate5Min0 + CurrRate, + {lists:reverse([CurrRate | lists:reverse(Last5MinSamples0)]), Acc, Acc / Tick}; true -> - [FirstSpeed | Speeds] = Last5MinSamples0, - Acc = AccSpeed5Min0 + CurrSpeed - FirstSpeed, - {lists:reverse([CurrSpeed | lists:reverse(Speeds)]), + [FirstRate | Rates] = Last5MinSamples0, + Acc = AccRate5Min0 + CurrRate - FirstRate, + {lists:reverse([CurrRate | lists:reverse(Rates)]), Acc, Acc / ?SAMPCOUNT_5M} end, - #speed{max = MaxSpeed, current = CurrSpeed, last5m = Last5Min, + #rate{max = MaxRate, current = CurrRate, last5m = Last5Min, last_v = CurrVal, last5m_acc = Acc5Min, last5m_smpl = Last5MinSamples, tick = Tick + 1}. -format_speed(#speed{max = Max, current = Current, last5m = Last5Min}) -> +format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) -> #{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}. precision(Float, N) -> diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl index 3a74cd232..3f8a63f25 100644 --- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl @@ -24,7 +24,7 @@ all() -> [ {group, metrics} - , {group, speed} ]. + , {group, rate} ]. suite() -> [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. @@ -34,8 +34,8 @@ groups() -> [ t_rule , t_no_creation_1 ]}, - {speed, [sequence], - [ rule_speed + {rate, [sequence], + [ rule_rate ]} ]. @@ -74,7 +74,7 @@ t_rule(_) -> ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>), ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>). -rule_speed(_) -> +rule_rate(_) -> ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>), ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), @@ -83,11 +83,11 @@ rule_speed(_) -> ?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')), ct:sleep(1000), ?LET(#{max := Max, current := Current}, - emqx_plugin_libs_metrics:get_speed(?NAME, <<"rule1">>), + emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), {?assert(Max =< 2), ?assert(Current =< 2)}), ct:sleep(2100), - ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_speed(?NAME, <<"rule1">>), + ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), {?assert(Max =< 2), ?assert(Current == 0), ?assert(Last5Min =< 0.67)}), diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 448f63138..1fe75447e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -32,13 +32,40 @@ check_params(Params, Tag) -> roots() -> [ {"rule_creation", sc(ref("rule_creation"), #{desc => "Schema for creating rules"})} + , {"rule_info", sc(ref("rule_info"), #{desc => "Schema for rule info"})} + , {"rule_events", sc(ref("rule_events"), #{desc => "Schema for rule events"})} , {"rule_test", sc(ref("rule_test"), #{desc => "Schema for testing rules"})} ]. fields("rule_creation") -> - [ {"id", sc(binary(), #{desc => "The Id of the rule", nullable => false})} + [ {"id", sc(binary(), + #{ desc => "The Id of the rule", nullable => false + , example => "my_rule_id" + })} ] ++ emqx_rule_engine_schema:fields("rules"); +fields("rule_info") -> + [ {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})} + , {"node_metrics", sc(ref("node_metrics"), #{desc => "The metrics of the rule"})} + , {"from", sc(hoconsc:array(binary()), + #{desc => "The topics of the rule", example => "t/#"})} + , {"created_at", sc(binary(), + #{ desc => "The created time of the rule" + , example => "2021-12-01T15:00:43.153+08:00" + })} + ] ++ fields("rule_creation"); + +%% TODO: we can delete this API if the Dashboard not denpends on it +fields("rule_events") -> + ETopics = [emqx_rule_events:event_topic(E) || E <- emqx_rule_events:event_names()], + [ {"event", sc(hoconsc:enum(ETopics), #{desc => "The event topics", nullable => false})} + , {"title", sc(binary(), #{desc => "The title", example => "some title"})} + , {"description", sc(binary(), #{desc => "The description", example => "some desc"})} + , {"columns", sc(map(), #{desc => "The columns"})} + , {"test_columns", sc(map(), #{desc => "The test columns"})} + , {"sql_example", sc(binary(), #{desc => "The sql_example"})} + ]; + fields("rule_test") -> [ {"context", sc(hoconsc:union([ ref("ctx_pub") , ref("ctx_sub") @@ -53,6 +80,18 @@ fields("rule_test") -> , {"sql", sc(binary(), #{desc => "The SQL of the rule for testing", nullable => false})} ]; +fields("metrics") -> + [ {"matched", sc(integer(), #{desc => "How much times this rule is matched"})} + , {"rate", sc(float(), #{desc => "The rate of matched, times/second"})} + , {"rate_max", sc(float(), #{desc => "The max rate of matched, times/second"})} + , {"rate_last5m", sc(float(), + #{desc => "The average rate of matched in last 5 mins, times/second"})} + ]; + +fields("node_metrics") -> + [ {"node", sc(binary(), #{desc => "The node name", example => "emqx@127.0.0.1"})} + ] ++ fields("metrics"); + fields("ctx_pub") -> [ {"event_type", sc(message_publish, #{desc => "Event Type", nullable => false})} , {"id", sc(binary(), #{desc => "Message ID"})} diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 9e341b388..75238fb71 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -18,16 +18,17 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("typerefl/include/types.hrl"). -behaviour(minirest_api). --export([api_spec/0]). +-import(hoconsc, [mk/2, ref/2, array/1]). --export([ crud_rules/2 - , list_events/2 - , crud_rules_by_id/2 - , rule_test/2 - ]). +%% Swagger specs from hocon schema +-export([api_spec/0, paths/0, schema/1, namespace/0]). + +%% API callbacks +-export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2]). -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))). -define(ERR_BADARGS(REASON), @@ -43,210 +44,130 @@ {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}} end). +namespace() -> "rule". + api_spec() -> - { - [ api_rules_list_create() - , api_rules_crud() - , api_rule_test() - , api_events_list() - ], - [] - }. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). -api_rules_list_create() -> - Metadata = #{ +paths() -> ["/rule_events", "/rule_test", "/rules", "/rules/:id"]. + +error_schema(Code, Message) -> + [ {code, mk(string(), #{example => Code})} + , {message, mk(string(), #{example => Message})} + ]. + +rule_creation_schema() -> + ref(emqx_rule_api_schema, "rule_creation"). + +rule_update_schema() -> + ref(emqx_rule_engine_schema, "rules"). + +rule_test_schema() -> + ref(emqx_rule_api_schema, "rule_test"). + +rule_info_schema() -> + ref(emqx_rule_api_schema, "rule_info"). + +schema("/rules") -> + #{ + operationId => '/rules', get => #{ + tags => [<<"rules">>], description => <<"List all rules">>, + summary => <<"List Rules">>, responses => #{ - <<"200">> => - emqx_mgmt_util:array_schema(resp_schema(), <<"List rules successfully">>)}}, + 200 => mk(array(rule_info_schema()), #{desc => "List of rules"}) + }}, post => #{ - description => <<"Create a new rule using given Id to all nodes in the cluster">>, - 'requestBody' => emqx_mgmt_util:schema(post_req_schema(), <<"Rule parameters">>), + tags => [<<"rules">>], + description => <<"Create a new rule using given Id">>, + summary => <<"Create a Rule">>, + requestBody => rule_creation_schema(), responses => #{ - <<"400">> => - emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']), - <<"201">> => - emqx_mgmt_util:schema(resp_schema(), <<"Create rule successfully">>)}} - }, - {"/rules", Metadata, crud_rules}. + 400 => error_schema('BAD_ARGS', "Invalid Parameters"), + 201 => rule_info_schema() + }} + }; -api_events_list() -> - Metadata = #{ +schema("/rule_events") -> + #{ + operationId => '/rule_events', get => #{ + tags => [<<"rules">>], description => <<"List all events can be used in rules">>, + summary => <<"List Events">>, responses => #{ - <<"200">> => - emqx_mgmt_util:array_schema(resp_schema(), <<"List events successfully">>)}} - }, - {"/rule_events", Metadata, list_events}. + 200 => mk(ref(emqx_rule_api_schema, "rule_events"), #{}) + } + } + }; -api_rules_crud() -> - Metadata = #{ +schema("/rules/:id") -> + #{ + operationId => '/rules/:id', get => #{ + tags => [<<"rules">>], description => <<"Get a rule by given Id">>, - parameters => [param_path_id()], + summary => <<"Get a Rule">>, + parameters => param_path_id(), responses => #{ - <<"404">> => - emqx_mgmt_util:error_schema(<<"Rule not found">>, ['NOT_FOUND']), - <<"200">> => - emqx_mgmt_util:schema(resp_schema(), <<"Get rule successfully">>)}}, + 404 => error_schema('NOT_FOUND', "Rule not found"), + 200 => rule_info_schema() + } + }, put => #{ - description => <<"Create or update a rule by given Id to all nodes in the cluster">>, - parameters => [param_path_id()], - 'requestBody' => emqx_mgmt_util:schema(put_req_schema(), <<"Rule parameters">>), + tags => [<<"rules">>], + description => <<"Update a rule by given Id to all nodes in the cluster">>, + summary => <<"Update a Rule">>, + parameters => param_path_id(), + requestBody => rule_update_schema(), responses => #{ - <<"400">> => - emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']), - <<"200">> => - emqx_mgmt_util:schema(resp_schema(), - <<"Create or update rule successfully">>)}}, + 400 => error_schema('BAD_ARGS', "Invalid Parameters"), + 200 => rule_info_schema() + } + }, delete => #{ + tags => [<<"rules">>], description => <<"Delete a rule by given Id from all nodes in the cluster">>, - parameters => [param_path_id()], + summary => <<"Delete a Rule">>, + parameters => param_path_id(), responses => #{ - <<"204">> => - emqx_mgmt_util:schema(<<"Delete rule successfully">>)}} - }, - {"/rules/:id", Metadata, crud_rules_by_id}. + 204 => <<"Delete rule successfully">> + } + } + }; -api_rule_test() -> - Metadata = #{ +schema("/rule_test") -> + #{ + operationId => '/rule_test', post => #{ + tags => [<<"rules">>], description => <<"Test a rule">>, - 'requestBody' => emqx_mgmt_util:schema(rule_test_req_schema(), <<"Rule parameters">>), + summary => <<"Test a Rule">>, + requestBody => rule_test_schema(), responses => #{ - <<"400">> => - emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']), - <<"412">> => - emqx_mgmt_util:error_schema(<<"SQL Not Match">>, ['NOT_MATCH']), - <<"200">> => - emqx_mgmt_util:schema(rule_test_resp_schema(), <<"Rule Test Pass">>)}} - }, - {"/rule_test", Metadata, rule_test}. - -put_req_schema() -> - #{type => object, - properties => #{ - sql => #{ - description => <<"The SQL">>, - type => string, - example => <<"SELECT * from \"t/1\"">> - }, - enable => #{ - description => <<"Enable or disable the rule">>, - type => boolean, - example => true - }, - outputs => #{ - description => <<"The outputs of the rule">>, - type => array, - items => #{ - 'oneOf' => [ - #{ - type => string, - example => <<"channel_id_of_my_bridge">>, - description => <<"The channel id of an emqx bridge">> - }, - #{ - type => object, - properties => #{ - function => #{ - type => string, - example => <<"console">> - } - } - } - ] + 400 => error_schema('BAD_ARGS', "Invalid Parameters"), + 412 => error_schema('NOT_MATCH', "SQL Not Match"), + 200 => <<"Rule Test Pass">> } - }, - description => #{ - description => <<"The description for the rule">>, - type => string, - example => <<"A simple rule that handles MQTT messages from topic \"t/1\"">> } - } }. -post_req_schema() -> - Req = #{properties := Prop} = put_req_schema(), - Req#{properties => Prop#{ - id => #{ - description => <<"The Id for the rule">>, - example => <<"my_rule">>, - type => string - } - }}. - -resp_schema() -> - Req = #{properties := Prop} = put_req_schema(), - Req#{properties => Prop#{ - id => #{ - description => <<"The Id for the rule">>, - type => string - }, - created_at => #{ - description => <<"The time that this rule was created, in rfc3339 format">>, - type => string, - example => <<"2021-09-18T13:57:29+08:00">> - } - }}. - -rule_test_req_schema() -> - #{type => object, properties => #{ - sql => #{ - description => <<"The SQL">>, - type => string, - example => <<"SELECT * from \"t/1\"">> - }, - context => #{ - type => object, - properties => #{ - event_type => #{ - description => <<"Event Type">>, - type => string, - enum => [<<"message_publish">>, <<"message_acked">>, <<"message_delivered">>, - <<"message_dropped">>, <<"session_subscribed">>, <<"session_unsubscribed">>, - <<"client_connected">>, <<"client_disconnected">>], - example => <<"message_publish">> - }, - clientid => #{ - description => <<"The Client ID">>, - type => string, - example => <<"\"c_emqx\"">> - }, - topic => #{ - description => <<"The Topic">>, - type => string, - example => <<"t/1">> - } - } - } - }}. - -rule_test_resp_schema() -> - #{type => object}. - param_path_id() -> - #{ - name => id, - in => path, - schema => #{type => string}, - required => true - }. + [{id, mk(binary(), #{in => path, example => <<"my_rule_id">>})}]. %%------------------------------------------------------------------------------ %% Rules API %%------------------------------------------------------------------------------ -list_events(#{}, _Params) -> +'/rule_events'(get, _Params) -> {200, emqx_rule_events:event_info()}. -crud_rules(get, _Params) -> +'/rules'(get, _Params) -> Records = emqx_rule_engine:get_rules_ordered_by_ts(), {200, format_rule_resp(Records)}; -crud_rules(post, #{body := #{<<"id">> := Id} = Params}) -> +'/rules'(post, #{body := #{<<"id">> := Id} = Params}) -> ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx_rule_engine:get_rule(Id) of {ok, _Rule} -> @@ -263,13 +184,13 @@ crud_rules(post, #{body := #{<<"id">> := Id} = Params}) -> end end. -rule_test(post, #{body := Params}) -> +'/rule_test'(post, #{body := Params}) -> ?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of {ok, Result} -> {200, Result}; {error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}} end). -crud_rules_by_id(get, #{bindings := #{id := Id}}) -> +'/rules/:id'(get, #{bindings := #{id := Id}}) -> case emqx_rule_engine:get_rule(Id) of {ok, Rule} -> {200, format_rule_resp(Rule)}; @@ -277,7 +198,7 @@ crud_rules_by_id(get, #{bindings := #{id := Id}}) -> {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}} end; -crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) -> +'/rules/:id'(put, #{bindings := #{id := Id}, body := Params}) -> ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> @@ -289,7 +210,7 @@ crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) -> {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} end; -crud_rules_by_id(delete, #{bindings := #{id := Id}}) -> +'/rules/:id'(delete, #{bindings := #{id := Id}}) -> ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx:remove_config(ConfPath, #{}) of {ok, _} -> {204}; @@ -315,11 +236,13 @@ format_rule_resp(#{ id := Id, created_at := CreatedAt, sql := SQL, enabled := Enabled, description := Descr}) -> + NodeMetrics = get_rule_metrics(Id), #{id => Id, from => Topics, outputs => format_output(Output), sql => SQL, - metrics => get_rule_metrics(Id), + metrics => aggregate_metrics(NodeMetrics), + node_metrics => NodeMetrics, enabled => Enabled, created_at => format_datetime(CreatedAt, millisecond), description => Descr @@ -339,19 +262,28 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) -> get_rule_metrics(Id) -> Format = fun (Node, #{matched := Matched, - speed := Current, - speed_max := Max, - speed_last5m := Last5M + rate := Current, + rate_max := Max, + rate_last5m := Last5M }) -> #{ matched => Matched - , speed => Current - , speed_max => Max - , speed_last5m => Last5M + , rate => Current + , rate_max => Max + , rate_last5m => Last5M , node => Node } end, [Format(Node, rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [rule_metrics, Id])) || Node <- mria_mnesia:running_nodes()]. +aggregate_metrics(AllMetrics) -> + InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0}, + lists:foldl(fun + (#{matched := Match1, rate := Rate1, rate_max := RateMax1, rate_last5m := Rate5m1}, + #{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) -> + #{matched => Match1 + Match0, rate => Rate1 + Rate0, + rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0} + end, InitMetrics, AllMetrics). + get_one_rule(AllRules, Id) -> [R || R = #{id := Id0} <- AllRules, Id0 == Id]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index 995044fc7..93661ab53 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -44,19 +44,17 @@ fields("rules") -> SQL query to transform the messages.
Example: SELECT * FROM \"test/topic\" WHERE payload.x = 1
""" + , example => "SELECT * FROM \"test/topic\" WHERE payload.x = 1" , nullable => false - , validator => fun ?MODULE:validate_sql/1})} - , {"outputs", sc(hoconsc:array(hoconsc:union( - [ binary() - , ref("builtin_output_republish") - , ref("builtin_output_console") - ])), + , validator => fun ?MODULE:validate_sql/1 + })} + , {"outputs", sc(hoconsc:array(hoconsc:union(outputs())), #{ desc => """ A list of outputs of the rule.
An output can be a string that refers to the channel Id of a emqx bridge, or a object that refers to a function.
There a some built-in functions like \"republish\" and \"console\", and we also support user -provided functions like \"ModuleName:FunctionName\".
+provided functions in the format: \"{module}:{function}\".
The outputs in the list is executed one by one in order. This means that if one of the output is executing slowly, all of the outputs comes after it will not be executed until it returns.
@@ -66,9 +64,19 @@ If there's any error when running an output, there will be an error message, and counter of the function output or the bridge channel will increase. """ , default => [] + , example => [ + <<"http:my_http_bridge">>, + #{function => republish, args => #{ + topic => <<"t/1">>, payload => <<"${payload}">>}}, + #{function => console} + ] })} , {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})} - , {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})} + , {"description", sc(binary(), + #{ desc => "The description of the rule" + , example => "Some description" + , default => <<>> + })} ]; fields("builtin_output_republish") -> @@ -106,6 +114,27 @@ fields("builtin_output_console") -> % default => #{}})} ]; +fields("user_provided_function") -> + [ {function, sc(binary(), + #{ desc => """ +The user provided function. Should be in the format: '{module}:{function}'.
+Where the is the erlang callback module and the {function} is the erlang function.
+To write your own function, checkout the function console and +republish in the source file: +apps/emqx_rule_engine/src/emqx_rule_outputs.erl as an example. +""" + , example => "module:function" + })} + , {args, sc(map(), + #{ desc => """ +The args will be passed as the 3rd argument to module:function/3, +checkout the function console and republish in the source file: +apps/emqx_rule_engine/src/emqx_rule_outputs.erl as an example. +""" + , default => #{} + })} + ]; + fields("republish_args") -> [ {topic, sc(binary(), #{ desc =>""" @@ -113,8 +142,9 @@ The target topic of message to be re-published.
Template with variables is allowed, see description of the 'republish_args'. """ , nullable => false + , example => <<"a/1">> })} - , {qos, sc(binary(), + , {qos, sc(qos(), #{ desc => """ The qos of the message to be re-published. Template with with variables is allowed, see description of the 'republish_args.
@@ -122,8 +152,9 @@ Defaults to ${qos}. If variable ${qos} is not found from the selected result of 0 is used. """ , default => <<"${qos}">> + , example => <<"${qos}">> })} - , {retain, sc(binary(), + , {retain, sc(hoconsc:union([binary(), boolean()]), #{ desc => """ The retain flag of the message to be re-published. Template with with variables is allowed, see description of the 'republish_args.
@@ -131,6 +162,7 @@ Defaults to ${retain}. If variable ${retain} is not found from the selected resu of the rule, false is used. """ , default => <<"${retain}">> + , example => <<"${retain}">> })} , {payload, sc(binary(), #{ desc => """ @@ -140,9 +172,20 @@ Defaults to ${payload}. If variable ${payload} is not found from the selected re of the rule, then the string \"undefined\" is used. """ , default => <<"${payload}">> + , example => <<"${payload}">> })} ]. +outputs() -> + [ binary() + , ref("builtin_output_republish") + , ref("builtin_output_console") + , ref("user_provided_function") + ]. + +qos() -> + hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2), binary()]). + validate_sql(Sql) -> case emqx_rule_sqlparser:parse(Sql) of {ok, _Result} -> ok; diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 0aff9f018..c61629b39 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -25,7 +25,9 @@ , load/1 , unload/0 , unload/1 + , event_names/0 , event_name/1 + , event_topic/1 , eventmsg_publish/1 ]). @@ -45,17 +47,6 @@ , columns_with_exam/1 ]). --define(SUPPORTED_HOOK, - [ 'client.connected' - , 'client.disconnected' - , 'session.subscribed' - , 'session.unsubscribed' - , 'message.publish' - , 'message.delivered' - , 'message.acked' - , 'message.dropped' - ]). - -ifdef(TEST). -export([ reason/1 , hook_fun/1 @@ -63,6 +54,17 @@ ]). -endif. +event_names() -> + [ 'client.connected' + , 'client.disconnected' + , 'session.subscribed' + , 'session.unsubscribed' + , 'message.publish' + , 'message.delivered' + , 'message.acked' + , 'message.dropped' + ]. + reload() -> lists:foreach(fun(Rule) -> ok = emqx_rule_engine:load_hooks_for_rule(Rule) @@ -78,7 +80,7 @@ load(Topic) -> unload() -> lists:foreach(fun(HookPoint) -> emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}) - end, ?SUPPORTED_HOOK). + end, event_names()). unload(Topic) -> HookPoint = event_name(Topic), diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 7b68b3ee3..1cabf3e32 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -247,9 +247,9 @@ handle_output(OutId, Selected, Envs) -> }) end. -do_handle_output(ChannelId, Selected, _Envs) when is_binary(ChannelId) -> - ?SLOG(debug, #{msg => "output to bridge", channel_id => ChannelId}), - emqx_bridge:send_message(ChannelId, Selected); +do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) -> + ?SLOG(debug, #{msg => "output to bridge", bridge_id => BridgeId}), + emqx_bridge:send_message(BridgeId, Selected); do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> Mod:Func(Selected, Envs, Args). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index 712d113f9..4dd564b36 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -36,34 +36,34 @@ t_crud_rule_api(_Config) -> <<"outputs">> => [#{<<"function">> => <<"console">>}], <<"sql">> => <<"SELECT * from \"t/1\"">> }, - {201, Rule} = emqx_rule_engine_api:crud_rules(post, #{body => Params0}), + {201, Rule} = emqx_rule_engine_api:'/rules'(post, #{body => Params0}), %% if we post again with the same params, it return with 400 "rule id already exists" ?assertMatch({400, #{code := _, message := _Message}}, - emqx_rule_engine_api:crud_rules(post, #{body => Params0})), + emqx_rule_engine_api:'/rules'(post, #{body => Params0})), ?assertEqual(RuleID, maps:get(id, Rule)), - {200, Rules} = emqx_rule_engine_api:crud_rules(get, #{}), + {200, Rules} = emqx_rule_engine_api:'/rules'(get, #{}), ct:pal("RList : ~p", [Rules]), ?assert(length(Rules) > 0), - {200, Rule1} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}), + {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}), ct:pal("RShow : ~p", [Rule1]), ?assertEqual(Rule, Rule1), - {200, Rule2} = emqx_rule_engine_api:crud_rules_by_id(put, #{ + {200, Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{ bindings => #{id => RuleID}, body => Params0#{<<"sql">> => <<"select * from \"t/b\"">>} }), - {200, Rule3} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}), + {200, Rule3} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}), %ct:pal("RShow : ~p", [Rule3]), ?assertEqual(Rule3, Rule2), ?assertEqual(<<"select * from \"t/b\"">>, maps:get(sql, Rule3)), - ?assertMatch({204}, emqx_rule_engine_api:crud_rules_by_id(delete, + ?assertMatch({204}, emqx_rule_engine_api:'/rules/:id'(delete, #{bindings => #{id => RuleID}})), %ct:pal("Show After Deleted: ~p", [NotFound]), ?assertMatch({404, #{code := _, message := _Message}}, - emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}})), + emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}})), ok.