From e07fce791fe789ff7e0066d7bd31254f1fa8c5b8 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Wed, 1 Dec 2021 16:31:58 +0800
Subject: [PATCH 01/10] feat(connector): add examples to the swagger doc
---
.../emqx_connector/src/emqx_connector_api.erl | 83 ++++++++++++++-----
.../src/emqx_connector_mqtt.erl | 4 +-
.../src/emqx_connector_schema.erl | 11 ++-
.../src/mqtt/emqx_connector_mqtt_schema.erl | 3 +-
.../src/emqx_rule_engine_api.erl | 4 +-
5 files changed, 74 insertions(+), 31 deletions(-)
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index 7e934b997..bc865906f 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -38,7 +38,7 @@
catch
error:{invalid_bridge_id, Id0} ->
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
- ". Bridge ID must be of format 'bridge_type:name'">>}}
+ ". Bridge Ids must be of format {type}:{name}">>}}
end).
namespace() -> "connector".
@@ -53,18 +53,61 @@ error_schema(Code, Message) ->
, {message, mk(string(), #{example => Message})}
].
+put_request_body_schema() ->
+ emqx_dashboard_swagger:schema_with_examples(
+ connector_info(put_req), connector_info_examples()).
+
+post_request_body_schema() ->
+ emqx_dashboard_swagger:schema_with_examples(
+ connector_info(post_req), connector_info_examples()).
+
+get_response_body_schema() ->
+ emqx_dashboard_swagger:schema_with_example(
+ connector_info(), connector_info_examples()).
+
connector_info() ->
+ connector_info(resp).
+
+connector_info(resp) ->
hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_info")
- ]).
-
-connector_test_info() ->
- hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_test_info")
- ]).
-
-connector_req() ->
+ ]);
+connector_info(put_req) ->
+ hoconsc:union([ ref(emqx_connector_mqtt_schema, "connector")
+ ]);
+connector_info(post_req) ->
hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector")
]).
+connector_info_array_example() ->
+ [Config || #{value := Config} <- maps:values(connector_info_examples())].
+
+connector_info_examples() ->
+ #{
+ mqtt => #{
+ summary => <<"MQTT Bridge">>,
+ value => mqtt_info_example()
+ }
+ }.
+
+mqtt_info_example() ->
+ #{
+ type => <<"mqtt">>,
+ server => <<"127.0.0.1:1883">>,
+ reconnect_interval => <<"30s">>,
+ proto_ver => <<"v4">>,
+ bridge_mode => true,
+ username => <<"foo">>,
+ password => <<"bar">>,
+ clientid => <<"foo">>,
+ clean_start => true,
+ keepalive => <<"300s">>,
+ retry_interval => <<"30s">>,
+ max_inflight => 100,
+ ssl => #{
+ enable => false
+ }
+ }.
+
param_path_id() ->
[{id, mk(binary(), #{in => path, example => <<"mqtt:my_mqtt_connector">>})}].
@@ -74,9 +117,9 @@ schema("/connectors_test") ->
post => #{
tags => [<<"connectors">>],
description => <<"Test creating a new connector by given Id
"
- "The ID must be of format 'type:name'">>,
+ "The ID must be of format '{type}:{name}'">>,
summary => <<"Test creating connector">>,
- requestBody => connector_test_info(),
+ requestBody => post_request_body_schema(),
responses => #{
200 => <<"Test connector OK">>,
400 => error_schema('TEST_FAILED', "connector test failed")
@@ -92,17 +135,19 @@ schema("/connectors") ->
description => <<"List all connectors">>,
summary => <<"List connectors">>,
responses => #{
- 200 => mk(array(connector_info()), #{desc => "List of connectors"})
+ 200 => emqx_dashboard_swagger:schema_with_example(
+ array(connector_info()),
+ connector_info_array_example())
}
},
post => #{
tags => [<<"connectors">>],
description => <<"Create a new connector by given Id
"
- "The ID must be of format 'type:name'">>,
+ "The ID must be of format '{type}:{name}'">>,
summary => <<"Create connector">>,
- requestBody => connector_info(),
+ requestBody => post_request_body_schema(),
responses => #{
- 201 => connector_info(),
+ 201 => get_response_body_schema(),
400 => error_schema('ALREADY_EXISTS', "connector already exists")
}
}
@@ -117,7 +162,7 @@ schema("/connectors/:id") ->
summary => <<"Get connector">>,
parameters => param_path_id(),
responses => #{
- 200 => connector_info(),
+ 200 => get_response_body_schema(),
404 => error_schema('NOT_FOUND', "Connector not found")
}
},
@@ -126,9 +171,9 @@ schema("/connectors/:id") ->
description => <<"Update an existing connector by Id">>,
summary => <<"Update connector">>,
parameters => param_path_id(),
- requestBody => connector_req(),
+ requestBody => put_request_body_schema(),
responses => #{
- 200 => <<"Update connector successfully">>,
+ 200 => get_response_body_schema(),
400 => error_schema('UPDATE_FAIL', "Update failed"),
404 => error_schema('NOT_FOUND', "Connector not found")
}},
@@ -143,8 +188,8 @@ schema("/connectors/:id") ->
}}
}.
-'/connectors_test'(post, #{body := #{<<"bridge_type">> := ConnType} = Params}) ->
- case emqx_connector:create_dry_run(ConnType, maps:remove(<<"bridge_type">>, Params)) of
+'/connectors_test'(post, #{body := #{<<"type">> := ConnType} = Params}) ->
+ case emqx_connector:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of
ok -> {200};
{error, Error} ->
{400, error_msg('BAD_ARG', Error)}
diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl
index 1acd8b298..190456262 100644
--- a/apps/emqx_connector/src/emqx_connector_mqtt.erl
+++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl
@@ -100,7 +100,7 @@ on_start(InstId, Conf) ->
BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{
name => InstanceId,
- clientid => clientid(InstanceId),
+ clientid => clientid(maps:get(clientid, Conf, InstanceId)),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)),
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
},
@@ -190,4 +190,4 @@ basic_config(#{
}.
clientid(Id) ->
- list_to_binary(lists:concat([Id, ":", node()])).
+ unicode:characters_to_binary([Id, ":", atom_to_list(node())], utf8).
diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl
index 3caf2b595..518d4e62d 100644
--- a/apps/emqx_connector/src/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/emqx_connector_schema.erl
@@ -22,14 +22,13 @@ fields("connectors") ->
];
fields("mqtt_connector") ->
- emqx_connector_mqtt_schema:fields("connector");
+ [ {type, sc(mqtt, #{desc => "The Connector Type"})}
+ %, {name, sc(binary(), #{desc => "The Connector Name"})}
+ ]
+ ++ emqx_connector_mqtt_schema:fields("connector");
fields("mqtt_connector_info") ->
- [{id, sc(binary(), #{desc => "The connector Id"})}]
- ++ fields("mqtt_connector");
-
-fields("mqtt_connector_test_info") ->
- [{bridge_type, sc(mqtt, #{desc => "The Bridge Type"})}]
+ [{id, sc(binary(), #{desc => "The connector Id", example => "mqtt:foo"})}]
++ fields("mqtt_connector").
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
index 6436a4c96..415c6fa1a 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
@@ -66,8 +66,7 @@ fields("connector") ->
})}
, {clientid,
sc(binary(),
- #{ default => "emqx_${nodename}"
- , desc => "The clientid of the MQTT protocol"
+ #{ desc => "The clientid of the MQTT protocol"
})}
, {clean_start,
sc(boolean(),
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
index 9e341b388..2629a33e7 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
@@ -92,7 +92,7 @@ api_rules_crud() ->
<<"200">> =>
emqx_mgmt_util:schema(resp_schema(), <<"Get rule successfully">>)}},
put => #{
- description => <<"Create or update a rule by given Id to all nodes in the cluster">>,
+ description => <<"Update a rule by given Id to all nodes in the cluster">>,
parameters => [param_path_id()],
'requestBody' => emqx_mgmt_util:schema(put_req_schema(), <<"Rule parameters">>),
responses => #{
@@ -100,7 +100,7 @@ api_rules_crud() ->
emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
<<"200">> =>
emqx_mgmt_util:schema(resp_schema(),
- <<"Create or update rule successfully">>)}},
+ <<"Update rule successfully">>)}},
delete => #{
description => <<"Delete a rule by given Id from all nodes in the cluster">>,
parameters => [param_path_id()],
From 24bded99d54e02146ba256b9aabc2c5b60af0b2b Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Fri, 3 Dec 2021 10:51:39 +0800
Subject: [PATCH 02/10] refactor(metrics): rename speed to rate in
emqx_plugin_libs_metrics
---
apps/emqx_bridge/src/emqx_bridge_api.erl | 18 +--
.../src/emqx_plugin_libs_metrics.erl | 110 +++++++++---------
.../src/emqx_rule_engine_api.erl | 12 +-
3 files changed, 70 insertions(+), 70 deletions(-)
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index 6e274903e..a82969dde 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -40,17 +40,17 @@
#{ matched => MATCH,
success => SUCC,
failed => FAILED,
- speed => RATE,
- speed_last5m => RATE_5,
- speed_max => RATE_MAX
+ rate => RATE,
+ rate_last5m => RATE_5,
+ rate_max => RATE_MAX
}).
-define(metrics(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
#{ matched := MATCH,
success := SUCC,
failed := FAILED,
- speed := RATE,
- speed_last5m := RATE_5,
- speed_max := RATE_MAX
+ rate := RATE,
+ rate_last5m := RATE_5,
+ rate_max := RATE_MAX
}).
req_schema() ->
@@ -76,9 +76,9 @@ metrics_schema() ->
matched => #{type => integer, example => "0"},
success => #{type => integer, example => "0"},
failed => #{type => integer, example => "0"},
- speed => #{type => number, format => float, example => "0.0"},
- speed_last5m => #{type => number, format => float, example => "0.0"},
- speed_max => #{type => number, format => float, example => "0.0"}
+ rate => #{type => number, format => float, example => "0.0"},
+ rate_last5m => #{type => number, format => float, example => "0.0"},
+ rate_max => #{type => number, format => float, example => "0.0"}
}
}.
diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl
index 824890efc..d48b10dd1 100644
--- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl
+++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl
@@ -27,7 +27,7 @@
-export([ inc/3
, inc/4
, get/3
- , get_speed/2
+ , get_rate/2
, create_metrics/2
, clear_metrics/2
]).
@@ -54,7 +54,7 @@
-define(SECS_5M, 300).
-define(SAMPLING, 10).
-else.
-%% Use 5 secs average speed instead of 5 mins in case of testing
+%% Use 5 secs average rate instead of 5 mins in case of testing
-define(SECS_5M, 5).
-define(SAMPLING, 1).
-endif.
@@ -65,9 +65,9 @@
matched => integer(),
success => integer(),
failed => integer(),
- speed => float(),
- speed_max => float(),
- speed_last5m => float()
+ rate => float(),
+ rate_max => float(),
+ rate_last5m => float()
}.
-type handler_name() :: atom().
-type metric_id() :: binary().
@@ -75,22 +75,22 @@
-define(CntrRef(Name), {?MODULE, Name}).
-define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).
-%% the speed of 'matched'
--record(speed, {
+%% the rate of 'matched'
+-record(rate, {
max = 0 :: number(),
current = 0 :: number(),
last5m = 0 :: number(),
- %% metadata for calculating the avg speed
+ %% metadata for calculating the avg rate
tick = 1 :: number(),
last_v = 0 :: number(),
- %% metadata for calculating the 5min avg speed
+ %% metadata for calculating the 5min avg rate
last5m_acc = 0 :: number(),
last5m_smpl = [] :: list()
}).
-record(state, {
metric_ids = sets:new(),
- speeds :: undefined | #{metric_id() => #speed{}}
+ rates :: undefined | #{metric_id() => #rate{}}
}).
%%------------------------------------------------------------------------------
@@ -122,19 +122,19 @@ get(Name, Id, Metric) ->
Ref -> counters:get(Ref, metrics_idx(Metric))
end.
--spec(get_speed(handler_name(), metric_id()) -> map()).
-get_speed(Name, Id) ->
- gen_server:call(Name, {get_speed, Id}).
+-spec(get_rate(handler_name(), metric_id()) -> map()).
+get_rate(Name, Id) ->
+ gen_server:call(Name, {get_rate, Id}).
-spec(get_metrics(handler_name(), metric_id()) -> metrics()).
get_metrics(Name, Id) ->
- #{max := Max, current := Current, last5m := Last5M} = get_speed(Name, Id),
+ #{max := Max, current := Current, last5m := Last5M} = get_rate(Name, Id),
#{matched => get_matched(Name, Id),
success => get_success(Name, Id),
failed => get_failed(Name, Id),
- speed => Current,
- speed_max => Max,
- speed_last5m => Last5M
+ rate => Current,
+ rate_max => Max,
+ rate_last5m => Last5M
}.
-spec inc(handler_name(), metric_id(), atom()) -> ok.
@@ -176,35 +176,35 @@ start_link(Name) ->
init(Name) ->
erlang:process_flag(trap_exit, true),
- %% the speed metrics
+ %% the rate metrics
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
persistent_term:put(?CntrRef(Name), #{}),
{ok, #state{}}.
-handle_call({get_speed, _Id}, _From, State = #state{speeds = undefined}) ->
- {reply, format_speed(#speed{}), State};
-handle_call({get_speed, Id}, _From, State = #state{speeds = Speeds}) ->
- {reply, case maps:get(Id, Speeds, undefined) of
- undefined -> format_speed(#speed{});
- Speed -> format_speed(Speed)
+handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) ->
+ {reply, format_rate(#rate{}), State};
+handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
+ {reply, case maps:get(Id, Rates, undefined) of
+ undefined -> format_rate(#rate{});
+ Rate -> format_rate(Rate)
end, State};
handle_call({create_metrics, Id}, _From,
- State = #state{metric_ids = MIDs, speeds = Speeds}) ->
+ State = #state{metric_ids = MIDs, rates = Rates}) ->
{reply, create_counters(get_self_name(), Id),
State#state{metric_ids = sets:add_element(Id, MIDs),
- speeds = case Speeds of
- undefined -> #{Id => #speed{}};
- _ -> Speeds#{Id => #speed{}}
+ rates = case Rates of
+ undefined -> #{Id => #rate{}};
+ _ -> Rates#{Id => #rate{}}
end}};
handle_call({delete_metrics, Id}, _From,
- State = #state{metric_ids = MIDs, speeds = Speeds}) ->
+ State = #state{metric_ids = MIDs, rates = Rates}) ->
{reply, delete_counters(get_self_name(), Id),
State#state{metric_ids = sets:del_element(Id, MIDs),
- speeds = case Speeds of
+ rates = case Rates of
undefined -> undefined;
- _ -> maps:remove(Id, Speeds)
+ _ -> maps:remove(Id, Rates)
end}};
handle_call(_Request, _From, State) ->
@@ -213,17 +213,17 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info(ticking, State = #state{speeds = undefined}) ->
+handle_info(ticking, State = #state{rates = undefined}) ->
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
{noreply, State};
-handle_info(ticking, State = #state{speeds = Speeds0}) ->
- Speeds = maps:map(
- fun(Id, Speed) ->
- calculate_speed(get_matched(get_self_name(), Id), Speed)
- end, Speeds0),
+handle_info(ticking, State = #state{rates = Rates0}) ->
+ Rates = maps:map(
+ fun(Id, Rate) ->
+ calculate_rate(get_matched(get_self_name(), Id), Rate)
+ end, Rates0),
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
- {noreply, State#state{speeds = Speeds}};
+ {noreply, State#state{rates = Rates}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -261,38 +261,38 @@ get_couters_ref(Name, Id) ->
get_all_counters(Name) ->
persistent_term:get(?CntrRef(Name), #{}).
-calculate_speed(_CurrVal, undefined) ->
+calculate_rate(_CurrVal, undefined) ->
undefined;
-calculate_speed(CurrVal, #speed{max = MaxSpeed0, last_v = LastVal,
- tick = Tick, last5m_acc = AccSpeed5Min0,
+calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal,
+ tick = Tick, last5m_acc = AccRate5Min0,
last5m_smpl = Last5MinSamples0}) ->
- %% calculate the current speed based on the last value of the counter
- CurrSpeed = (CurrVal - LastVal) / ?SAMPLING,
+ %% calculate the current rate based on the last value of the counter
+ CurrRate = (CurrVal - LastVal) / ?SAMPLING,
- %% calculate the max speed since the emqx startup
- MaxSpeed =
- if MaxSpeed0 >= CurrSpeed -> MaxSpeed0;
- true -> CurrSpeed
+ %% calculate the max rate since the emqx startup
+ MaxRate =
+ if MaxRate0 >= CurrRate -> MaxRate0;
+ true -> CurrRate
end,
- %% calculate the average speed in last 5 mins
+ %% calculate the average rate in last 5 mins
{Last5MinSamples, Acc5Min, Last5Min} =
if Tick =< ?SAMPCOUNT_5M ->
- Acc = AccSpeed5Min0 + CurrSpeed,
- {lists:reverse([CurrSpeed | lists:reverse(Last5MinSamples0)]),
+ Acc = AccRate5Min0 + CurrRate,
+ {lists:reverse([CurrRate | lists:reverse(Last5MinSamples0)]),
Acc, Acc / Tick};
true ->
- [FirstSpeed | Speeds] = Last5MinSamples0,
- Acc = AccSpeed5Min0 + CurrSpeed - FirstSpeed,
- {lists:reverse([CurrSpeed | lists:reverse(Speeds)]),
+ [FirstRate | Rates] = Last5MinSamples0,
+ Acc = AccRate5Min0 + CurrRate - FirstRate,
+ {lists:reverse([CurrRate | lists:reverse(Rates)]),
Acc, Acc / ?SAMPCOUNT_5M}
end,
- #speed{max = MaxSpeed, current = CurrSpeed, last5m = Last5Min,
+ #rate{max = MaxRate, current = CurrRate, last5m = Last5Min,
last_v = CurrVal, last5m_acc = Acc5Min,
last5m_smpl = Last5MinSamples, tick = Tick + 1}.
-format_speed(#speed{max = Max, current = Current, last5m = Last5Min}) ->
+format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) ->
#{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}.
precision(Float, N) ->
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
index 2629a33e7..e5fd2de1c 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
@@ -339,14 +339,14 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
get_rule_metrics(Id) ->
Format = fun (Node, #{matched := Matched,
- speed := Current,
- speed_max := Max,
- speed_last5m := Last5M
+ rate := Current,
+ rate_max := Max,
+ rate_last5m := Last5M
}) ->
#{ matched => Matched
- , speed => Current
- , speed_max => Max
- , speed_last5m => Last5M
+ , rate => Current
+ , rate_max => Max
+ , rate_last5m => Last5M
, node => Node
}
end,
From 416b9f8d7c52299ef579af9574399a02f53ef535 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Fri, 3 Dec 2021 10:50:22 +0800
Subject: [PATCH 03/10] refactor(rule): generate swagger from hocon schema for
/rules
---
.../src/emqx_rule_api_schema.erl | 41 ++-
.../src/emqx_rule_engine_api.erl | 282 +++++++-----------
.../src/emqx_rule_engine_schema.erl | 63 +++-
.../emqx_rule_engine/src/emqx_rule_events.erl | 26 +-
.../src/emqx_rule_runtime.erl | 6 +-
5 files changed, 217 insertions(+), 201 deletions(-)
diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
index 448f63138..1fe75447e 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
@@ -32,13 +32,40 @@ check_params(Params, Tag) ->
roots() ->
[ {"rule_creation", sc(ref("rule_creation"), #{desc => "Schema for creating rules"})}
+ , {"rule_info", sc(ref("rule_info"), #{desc => "Schema for rule info"})}
+ , {"rule_events", sc(ref("rule_events"), #{desc => "Schema for rule events"})}
, {"rule_test", sc(ref("rule_test"), #{desc => "Schema for testing rules"})}
].
fields("rule_creation") ->
- [ {"id", sc(binary(), #{desc => "The Id of the rule", nullable => false})}
+ [ {"id", sc(binary(),
+ #{ desc => "The Id of the rule", nullable => false
+ , example => "my_rule_id"
+ })}
] ++ emqx_rule_engine_schema:fields("rules");
+fields("rule_info") ->
+ [ {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})}
+ , {"node_metrics", sc(ref("node_metrics"), #{desc => "The metrics of the rule"})}
+ , {"from", sc(hoconsc:array(binary()),
+ #{desc => "The topics of the rule", example => "t/#"})}
+ , {"created_at", sc(binary(),
+ #{ desc => "The created time of the rule"
+ , example => "2021-12-01T15:00:43.153+08:00"
+ })}
+ ] ++ fields("rule_creation");
+
+%% TODO: we can delete this API if the Dashboard not denpends on it
+fields("rule_events") ->
+ ETopics = [emqx_rule_events:event_topic(E) || E <- emqx_rule_events:event_names()],
+ [ {"event", sc(hoconsc:enum(ETopics), #{desc => "The event topics", nullable => false})}
+ , {"title", sc(binary(), #{desc => "The title", example => "some title"})}
+ , {"description", sc(binary(), #{desc => "The description", example => "some desc"})}
+ , {"columns", sc(map(), #{desc => "The columns"})}
+ , {"test_columns", sc(map(), #{desc => "The test columns"})}
+ , {"sql_example", sc(binary(), #{desc => "The sql_example"})}
+ ];
+
fields("rule_test") ->
[ {"context", sc(hoconsc:union([ ref("ctx_pub")
, ref("ctx_sub")
@@ -53,6 +80,18 @@ fields("rule_test") ->
, {"sql", sc(binary(), #{desc => "The SQL of the rule for testing", nullable => false})}
];
+fields("metrics") ->
+ [ {"matched", sc(integer(), #{desc => "How much times this rule is matched"})}
+ , {"rate", sc(float(), #{desc => "The rate of matched, times/second"})}
+ , {"rate_max", sc(float(), #{desc => "The max rate of matched, times/second"})}
+ , {"rate_last5m", sc(float(),
+ #{desc => "The average rate of matched in last 5 mins, times/second"})}
+ ];
+
+fields("node_metrics") ->
+ [ {"node", sc(binary(), #{desc => "The node name", example => "emqx@127.0.0.1"})}
+ ] ++ fields("metrics");
+
fields("ctx_pub") ->
[ {"event_type", sc(message_publish, #{desc => "Event Type", nullable => false})}
, {"id", sc(binary(), #{desc => "Message ID"})}
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
index e5fd2de1c..75238fb71 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
@@ -18,16 +18,17 @@
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
+-include_lib("typerefl/include/types.hrl").
-behaviour(minirest_api).
--export([api_spec/0]).
+-import(hoconsc, [mk/2, ref/2, array/1]).
--export([ crud_rules/2
- , list_events/2
- , crud_rules_by_id/2
- , rule_test/2
- ]).
+%% Swagger specs from hocon schema
+-export([api_spec/0, paths/0, schema/1, namespace/0]).
+
+%% API callbacks
+-export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2]).
-define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))).
-define(ERR_BADARGS(REASON),
@@ -43,210 +44,130 @@
{400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}}
end).
+namespace() -> "rule".
+
api_spec() ->
- {
- [ api_rules_list_create()
- , api_rules_crud()
- , api_rule_test()
- , api_events_list()
- ],
- []
- }.
+ emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
-api_rules_list_create() ->
- Metadata = #{
+paths() -> ["/rule_events", "/rule_test", "/rules", "/rules/:id"].
+
+error_schema(Code, Message) ->
+ [ {code, mk(string(), #{example => Code})}
+ , {message, mk(string(), #{example => Message})}
+ ].
+
+rule_creation_schema() ->
+ ref(emqx_rule_api_schema, "rule_creation").
+
+rule_update_schema() ->
+ ref(emqx_rule_engine_schema, "rules").
+
+rule_test_schema() ->
+ ref(emqx_rule_api_schema, "rule_test").
+
+rule_info_schema() ->
+ ref(emqx_rule_api_schema, "rule_info").
+
+schema("/rules") ->
+ #{
+ operationId => '/rules',
get => #{
+ tags => [<<"rules">>],
description => <<"List all rules">>,
+ summary => <<"List Rules">>,
responses => #{
- <<"200">> =>
- emqx_mgmt_util:array_schema(resp_schema(), <<"List rules successfully">>)}},
+ 200 => mk(array(rule_info_schema()), #{desc => "List of rules"})
+ }},
post => #{
- description => <<"Create a new rule using given Id to all nodes in the cluster">>,
- 'requestBody' => emqx_mgmt_util:schema(post_req_schema(), <<"Rule parameters">>),
+ tags => [<<"rules">>],
+ description => <<"Create a new rule using given Id">>,
+ summary => <<"Create a Rule">>,
+ requestBody => rule_creation_schema(),
responses => #{
- <<"400">> =>
- emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
- <<"201">> =>
- emqx_mgmt_util:schema(resp_schema(), <<"Create rule successfully">>)}}
- },
- {"/rules", Metadata, crud_rules}.
+ 400 => error_schema('BAD_ARGS', "Invalid Parameters"),
+ 201 => rule_info_schema()
+ }}
+ };
-api_events_list() ->
- Metadata = #{
+schema("/rule_events") ->
+ #{
+ operationId => '/rule_events',
get => #{
+ tags => [<<"rules">>],
description => <<"List all events can be used in rules">>,
+ summary => <<"List Events">>,
responses => #{
- <<"200">> =>
- emqx_mgmt_util:array_schema(resp_schema(), <<"List events successfully">>)}}
- },
- {"/rule_events", Metadata, list_events}.
+ 200 => mk(ref(emqx_rule_api_schema, "rule_events"), #{})
+ }
+ }
+ };
-api_rules_crud() ->
- Metadata = #{
+schema("/rules/:id") ->
+ #{
+ operationId => '/rules/:id',
get => #{
+ tags => [<<"rules">>],
description => <<"Get a rule by given Id">>,
- parameters => [param_path_id()],
+ summary => <<"Get a Rule">>,
+ parameters => param_path_id(),
responses => #{
- <<"404">> =>
- emqx_mgmt_util:error_schema(<<"Rule not found">>, ['NOT_FOUND']),
- <<"200">> =>
- emqx_mgmt_util:schema(resp_schema(), <<"Get rule successfully">>)}},
+ 404 => error_schema('NOT_FOUND', "Rule not found"),
+ 200 => rule_info_schema()
+ }
+ },
put => #{
+ tags => [<<"rules">>],
description => <<"Update a rule by given Id to all nodes in the cluster">>,
- parameters => [param_path_id()],
- 'requestBody' => emqx_mgmt_util:schema(put_req_schema(), <<"Rule parameters">>),
+ summary => <<"Update a Rule">>,
+ parameters => param_path_id(),
+ requestBody => rule_update_schema(),
responses => #{
- <<"400">> =>
- emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
- <<"200">> =>
- emqx_mgmt_util:schema(resp_schema(),
- <<"Update rule successfully">>)}},
+ 400 => error_schema('BAD_ARGS', "Invalid Parameters"),
+ 200 => rule_info_schema()
+ }
+ },
delete => #{
+ tags => [<<"rules">>],
description => <<"Delete a rule by given Id from all nodes in the cluster">>,
- parameters => [param_path_id()],
+ summary => <<"Delete a Rule">>,
+ parameters => param_path_id(),
responses => #{
- <<"204">> =>
- emqx_mgmt_util:schema(<<"Delete rule successfully">>)}}
- },
- {"/rules/:id", Metadata, crud_rules_by_id}.
+ 204 => <<"Delete rule successfully">>
+ }
+ }
+ };
-api_rule_test() ->
- Metadata = #{
+schema("/rule_test") ->
+ #{
+ operationId => '/rule_test',
post => #{
+ tags => [<<"rules">>],
description => <<"Test a rule">>,
- 'requestBody' => emqx_mgmt_util:schema(rule_test_req_schema(), <<"Rule parameters">>),
+ summary => <<"Test a Rule">>,
+ requestBody => rule_test_schema(),
responses => #{
- <<"400">> =>
- emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
- <<"412">> =>
- emqx_mgmt_util:error_schema(<<"SQL Not Match">>, ['NOT_MATCH']),
- <<"200">> =>
- emqx_mgmt_util:schema(rule_test_resp_schema(), <<"Rule Test Pass">>)}}
- },
- {"/rule_test", Metadata, rule_test}.
-
-put_req_schema() ->
- #{type => object,
- properties => #{
- sql => #{
- description => <<"The SQL">>,
- type => string,
- example => <<"SELECT * from \"t/1\"">>
- },
- enable => #{
- description => <<"Enable or disable the rule">>,
- type => boolean,
- example => true
- },
- outputs => #{
- description => <<"The outputs of the rule">>,
- type => array,
- items => #{
- 'oneOf' => [
- #{
- type => string,
- example => <<"channel_id_of_my_bridge">>,
- description => <<"The channel id of an emqx bridge">>
- },
- #{
- type => object,
- properties => #{
- function => #{
- type => string,
- example => <<"console">>
- }
- }
- }
- ]
+ 400 => error_schema('BAD_ARGS', "Invalid Parameters"),
+ 412 => error_schema('NOT_MATCH', "SQL Not Match"),
+ 200 => <<"Rule Test Pass">>
}
- },
- description => #{
- description => <<"The description for the rule">>,
- type => string,
- example => <<"A simple rule that handles MQTT messages from topic \"t/1\"">>
}
- }
}.
-post_req_schema() ->
- Req = #{properties := Prop} = put_req_schema(),
- Req#{properties => Prop#{
- id => #{
- description => <<"The Id for the rule">>,
- example => <<"my_rule">>,
- type => string
- }
- }}.
-
-resp_schema() ->
- Req = #{properties := Prop} = put_req_schema(),
- Req#{properties => Prop#{
- id => #{
- description => <<"The Id for the rule">>,
- type => string
- },
- created_at => #{
- description => <<"The time that this rule was created, in rfc3339 format">>,
- type => string,
- example => <<"2021-09-18T13:57:29+08:00">>
- }
- }}.
-
-rule_test_req_schema() ->
- #{type => object, properties => #{
- sql => #{
- description => <<"The SQL">>,
- type => string,
- example => <<"SELECT * from \"t/1\"">>
- },
- context => #{
- type => object,
- properties => #{
- event_type => #{
- description => <<"Event Type">>,
- type => string,
- enum => [<<"message_publish">>, <<"message_acked">>, <<"message_delivered">>,
- <<"message_dropped">>, <<"session_subscribed">>, <<"session_unsubscribed">>,
- <<"client_connected">>, <<"client_disconnected">>],
- example => <<"message_publish">>
- },
- clientid => #{
- description => <<"The Client ID">>,
- type => string,
- example => <<"\"c_emqx\"">>
- },
- topic => #{
- description => <<"The Topic">>,
- type => string,
- example => <<"t/1">>
- }
- }
- }
- }}.
-
-rule_test_resp_schema() ->
- #{type => object}.
-
param_path_id() ->
- #{
- name => id,
- in => path,
- schema => #{type => string},
- required => true
- }.
+ [{id, mk(binary(), #{in => path, example => <<"my_rule_id">>})}].
%%------------------------------------------------------------------------------
%% Rules API
%%------------------------------------------------------------------------------
-list_events(#{}, _Params) ->
+'/rule_events'(get, _Params) ->
{200, emqx_rule_events:event_info()}.
-crud_rules(get, _Params) ->
+'/rules'(get, _Params) ->
Records = emqx_rule_engine:get_rules_ordered_by_ts(),
{200, format_rule_resp(Records)};
-crud_rules(post, #{body := #{<<"id">> := Id} = Params}) ->
+'/rules'(post, #{body := #{<<"id">> := Id} = Params}) ->
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx_rule_engine:get_rule(Id) of
{ok, _Rule} ->
@@ -263,13 +184,13 @@ crud_rules(post, #{body := #{<<"id">> := Id} = Params}) ->
end
end.
-rule_test(post, #{body := Params}) ->
+'/rule_test'(post, #{body := Params}) ->
?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of
{ok, Result} -> {200, Result};
{error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}}
end).
-crud_rules_by_id(get, #{bindings := #{id := Id}}) ->
+'/rules/:id'(get, #{bindings := #{id := Id}}) ->
case emqx_rule_engine:get_rule(Id) of
{ok, Rule} ->
{200, format_rule_resp(Rule)};
@@ -277,7 +198,7 @@ crud_rules_by_id(get, #{bindings := #{id := Id}}) ->
{404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
end;
-crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) ->
+'/rules/:id'(put, #{bindings := #{id := Id}, body := Params}) ->
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
@@ -289,7 +210,7 @@ crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) ->
{400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
end;
-crud_rules_by_id(delete, #{bindings := #{id := Id}}) ->
+'/rules/:id'(delete, #{bindings := #{id := Id}}) ->
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx:remove_config(ConfPath, #{}) of
{ok, _} -> {204};
@@ -315,11 +236,13 @@ format_rule_resp(#{ id := Id, created_at := CreatedAt,
sql := SQL,
enabled := Enabled,
description := Descr}) ->
+ NodeMetrics = get_rule_metrics(Id),
#{id => Id,
from => Topics,
outputs => format_output(Output),
sql => SQL,
- metrics => get_rule_metrics(Id),
+ metrics => aggregate_metrics(NodeMetrics),
+ node_metrics => NodeMetrics,
enabled => Enabled,
created_at => format_datetime(CreatedAt, millisecond),
description => Descr
@@ -353,5 +276,14 @@ get_rule_metrics(Id) ->
[Format(Node, rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [rule_metrics, Id]))
|| Node <- mria_mnesia:running_nodes()].
+aggregate_metrics(AllMetrics) ->
+ InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0},
+ lists:foldl(fun
+ (#{matched := Match1, rate := Rate1, rate_max := RateMax1, rate_last5m := Rate5m1},
+ #{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) ->
+ #{matched => Match1 + Match0, rate => Rate1 + Rate0,
+ rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0}
+ end, InitMetrics, AllMetrics).
+
get_one_rule(AllRules, Id) ->
[R || R = #{id := Id0} <- AllRules, Id0 == Id].
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
index 995044fc7..93661ab53 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
@@ -44,19 +44,17 @@ fields("rules") ->
SQL query to transform the messages.
Example: SELECT * FROM \"test/topic\" WHERE payload.x = 1
"""
+ , example => "SELECT * FROM \"test/topic\" WHERE payload.x = 1"
, nullable => false
- , validator => fun ?MODULE:validate_sql/1})}
- , {"outputs", sc(hoconsc:array(hoconsc:union(
- [ binary()
- , ref("builtin_output_republish")
- , ref("builtin_output_console")
- ])),
+ , validator => fun ?MODULE:validate_sql/1
+ })}
+ , {"outputs", sc(hoconsc:array(hoconsc:union(outputs())),
#{ desc => """
A list of outputs of the rule.
An output can be a string that refers to the channel Id of a emqx bridge, or a object
that refers to a function.
There a some built-in functions like \"republish\" and \"console\", and we also support user
-provided functions like \"ModuleName:FunctionName\".
+provided functions in the format: \"{module}:{function}\".
The outputs in the list is executed one by one in order.
This means that if one of the output is executing slowly, all of the outputs comes after it will not
be executed until it returns.
@@ -66,9 +64,19 @@ If there's any error when running an output, there will be an error message, and
counter of the function output or the bridge channel will increase.
"""
, default => []
+ , example => [
+ <<"http:my_http_bridge">>,
+ #{function => republish, args => #{
+ topic => <<"t/1">>, payload => <<"${payload}">>}},
+ #{function => console}
+ ]
})}
, {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})}
- , {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})}
+ , {"description", sc(binary(),
+ #{ desc => "The description of the rule"
+ , example => "Some description"
+ , default => <<>>
+ })}
];
fields("builtin_output_republish") ->
@@ -106,6 +114,27 @@ fields("builtin_output_console") ->
% default => #{}})}
];
+fields("user_provided_function") ->
+ [ {function, sc(binary(),
+ #{ desc => """
+The user provided function. Should be in the format: '{module}:{function}'.
+Where the is the erlang callback module and the {function} is the erlang function.
+To write your own function, checkout the function console
and
+republish
in the source file:
+apps/emqx_rule_engine/src/emqx_rule_outputs.erl
as an example.
+"""
+ , example => "module:function"
+ })}
+ , {args, sc(map(),
+ #{ desc => """
+The args will be passed as the 3rd argument to module:function/3,
+checkout the function console
and republish
in the source file:
+apps/emqx_rule_engine/src/emqx_rule_outputs.erl
as an example.
+"""
+ , default => #{}
+ })}
+ ];
+
fields("republish_args") ->
[ {topic, sc(binary(),
#{ desc =>"""
@@ -113,8 +142,9 @@ The target topic of message to be re-published.
Template with variables is allowed, see description of the 'republish_args'.
"""
, nullable => false
+ , example => <<"a/1">>
})}
- , {qos, sc(binary(),
+ , {qos, sc(qos(),
#{ desc => """
The qos of the message to be re-published.
Template with with variables is allowed, see description of the 'republish_args.
@@ -122,8 +152,9 @@ Defaults to ${qos}. If variable ${qos} is not found from the selected result of
0 is used.
"""
, default => <<"${qos}">>
+ , example => <<"${qos}">>
})}
- , {retain, sc(binary(),
+ , {retain, sc(hoconsc:union([binary(), boolean()]),
#{ desc => """
The retain flag of the message to be re-published.
Template with with variables is allowed, see description of the 'republish_args.
@@ -131,6 +162,7 @@ Defaults to ${retain}. If variable ${retain} is not found from the selected resu
of the rule, false is used.
"""
, default => <<"${retain}">>
+ , example => <<"${retain}">>
})}
, {payload, sc(binary(),
#{ desc => """
@@ -140,9 +172,20 @@ Defaults to ${payload}. If variable ${payload} is not found from the selected re
of the rule, then the string \"undefined\" is used.
"""
, default => <<"${payload}">>
+ , example => <<"${payload}">>
})}
].
+outputs() ->
+ [ binary()
+ , ref("builtin_output_republish")
+ , ref("builtin_output_console")
+ , ref("user_provided_function")
+ ].
+
+qos() ->
+ hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2), binary()]).
+
validate_sql(Sql) ->
case emqx_rule_sqlparser:parse(Sql) of
{ok, _Result} -> ok;
diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl
index 0aff9f018..c61629b39 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_events.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl
@@ -25,7 +25,9 @@
, load/1
, unload/0
, unload/1
+ , event_names/0
, event_name/1
+ , event_topic/1
, eventmsg_publish/1
]).
@@ -45,17 +47,6 @@
, columns_with_exam/1
]).
--define(SUPPORTED_HOOK,
- [ 'client.connected'
- , 'client.disconnected'
- , 'session.subscribed'
- , 'session.unsubscribed'
- , 'message.publish'
- , 'message.delivered'
- , 'message.acked'
- , 'message.dropped'
- ]).
-
-ifdef(TEST).
-export([ reason/1
, hook_fun/1
@@ -63,6 +54,17 @@
]).
-endif.
+event_names() ->
+ [ 'client.connected'
+ , 'client.disconnected'
+ , 'session.subscribed'
+ , 'session.unsubscribed'
+ , 'message.publish'
+ , 'message.delivered'
+ , 'message.acked'
+ , 'message.dropped'
+ ].
+
reload() ->
lists:foreach(fun(Rule) ->
ok = emqx_rule_engine:load_hooks_for_rule(Rule)
@@ -78,7 +80,7 @@ load(Topic) ->
unload() ->
lists:foreach(fun(HookPoint) ->
emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)})
- end, ?SUPPORTED_HOOK).
+ end, event_names()).
unload(Topic) ->
HookPoint = event_name(Topic),
diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl
index 7b68b3ee3..1cabf3e32 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl
@@ -247,9 +247,9 @@ handle_output(OutId, Selected, Envs) ->
})
end.
-do_handle_output(ChannelId, Selected, _Envs) when is_binary(ChannelId) ->
- ?SLOG(debug, #{msg => "output to bridge", channel_id => ChannelId}),
- emqx_bridge:send_message(ChannelId, Selected);
+do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
+ ?SLOG(debug, #{msg => "output to bridge", bridge_id => BridgeId}),
+ emqx_bridge:send_message(BridgeId, Selected);
do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
Mod:Func(Selected, Envs, Args).
From affe69afd69479d7ee1ecf43697ab25b4a75dabc Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Fri, 3 Dec 2021 11:27:41 +0800
Subject: [PATCH 04/10] fix(rules): update the test cases for rule APIs
---
.../test/emqx_rule_engine_api_SUITE.erl | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl
index 712d113f9..4dd564b36 100644
--- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl
+++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl
@@ -36,34 +36,34 @@ t_crud_rule_api(_Config) ->
<<"outputs">> => [#{<<"function">> => <<"console">>}],
<<"sql">> => <<"SELECT * from \"t/1\"">>
},
- {201, Rule} = emqx_rule_engine_api:crud_rules(post, #{body => Params0}),
+ {201, Rule} = emqx_rule_engine_api:'/rules'(post, #{body => Params0}),
%% if we post again with the same params, it return with 400 "rule id already exists"
?assertMatch({400, #{code := _, message := _Message}},
- emqx_rule_engine_api:crud_rules(post, #{body => Params0})),
+ emqx_rule_engine_api:'/rules'(post, #{body => Params0})),
?assertEqual(RuleID, maps:get(id, Rule)),
- {200, Rules} = emqx_rule_engine_api:crud_rules(get, #{}),
+ {200, Rules} = emqx_rule_engine_api:'/rules'(get, #{}),
ct:pal("RList : ~p", [Rules]),
?assert(length(Rules) > 0),
- {200, Rule1} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}),
+ {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}),
ct:pal("RShow : ~p", [Rule1]),
?assertEqual(Rule, Rule1),
- {200, Rule2} = emqx_rule_engine_api:crud_rules_by_id(put, #{
+ {200, Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
bindings => #{id => RuleID},
body => Params0#{<<"sql">> => <<"select * from \"t/b\"">>}
}),
- {200, Rule3} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}),
+ {200, Rule3} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}),
%ct:pal("RShow : ~p", [Rule3]),
?assertEqual(Rule3, Rule2),
?assertEqual(<<"select * from \"t/b\"">>, maps:get(sql, Rule3)),
- ?assertMatch({204}, emqx_rule_engine_api:crud_rules_by_id(delete,
+ ?assertMatch({204}, emqx_rule_engine_api:'/rules/:id'(delete,
#{bindings => #{id => RuleID}})),
%ct:pal("Show After Deleted: ~p", [NotFound]),
?assertMatch({404, #{code := _, message := _Message}},
- emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}})),
+ emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}})),
ok.
From 56d46c80ebf6c502fb47c2ef89f022e87f1f4186 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Sat, 4 Dec 2021 21:22:20 +0800
Subject: [PATCH 05/10] refactor(rule): generate swagger from hocon schema for
/bridges
---
apps/emqx_bridge/src/emqx_bridge_api.erl | 365 ++++++++++--------
.../src/emqx_bridge_http_schema.erl | 95 +++++
.../src/emqx_bridge_mqtt_schema.erl | 62 +++
apps/emqx_bridge/src/emqx_bridge_schema.erl | 140 ++-----
.../emqx_connector/src/emqx_connector_api.erl | 2 +-
.../src/emqx_connector_mqtt.erl | 4 +-
6 files changed, 410 insertions(+), 258 deletions(-)
create mode 100644 apps/emqx_bridge/src/emqx_bridge_http_schema.erl
create mode 100644 apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index a82969dde..a5c24be9f 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -17,23 +17,32 @@
-behaviour(minirest_api).
--export([api_spec/0]).
+-include_lib("typerefl/include/types.hrl").
--export([ list_create_bridges_in_cluster/2
- , list_local_bridges/1
- , crud_bridges_in_cluster/2
- , manage_bridges/2
+-import(hoconsc, [mk/2, array/1, enum/1]).
+
+%% Swagger specs from hocon schema
+-export([api_spec/0, paths/0, schema/1, namespace/0]).
+
+%% API callbacks
+-export(['/bridges'/2, '/bridges/:id'/2,
+ '/nodes/:node/bridges/:id/operation/:operation'/2]).
+
+-export([ list_local_bridges/1
, lookup_from_local_node/2
]).
-define(TYPES, [mqtt, http]).
+
+-define(CONN_TYPES, [mqtt]).
+
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_bridge:parse_bridge_id(Id) of
{BridgeType, BridgeName} -> EXPR
catch
error:{invalid_bridge_id, Id0} ->
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
- ". Bridge ID must be of format 'bridge_type:name'">>}}
+ ". Bridge Ids must be of format {type}:{name}">>}}
end).
-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
@@ -53,184 +62,221 @@
rate_max := RATE_MAX
}).
-req_schema() ->
- Schema = [
- case maps:to_list(emqx:get_raw_config([bridges, T], #{})) of
- %% the bridge is not configured, so we have no method to get the schema
- [] -> #{};
- [{_K, Conf} | _] ->
- emqx_mgmt_api_configs:gen_schema(Conf)
- end
- || T <- ?TYPES],
- #{'oneOf' => Schema}.
-
-node_schema() ->
- #{type => string, example => "emqx@127.0.0.1"}.
-
-status_schema() ->
- #{type => string, enum => [connected, disconnected]}.
-
-metrics_schema() ->
- #{ type => object
- , properties => #{
- matched => #{type => integer, example => "0"},
- success => #{type => integer, example => "0"},
- failed => #{type => integer, example => "0"},
- rate => #{type => number, format => float, example => "0.0"},
- rate_last5m => #{type => number, format => float, example => "0.0"},
- rate_max => #{type => number, format => float, example => "0.0"}
- }
- }.
-
-per_node_schema(Key, Schema) ->
- #{
- type => array,
- items => #{
- type => object,
- properties => #{
- node => node_schema(),
- Key => Schema
- }
- }
- }.
-
-resp_schema() ->
- AddMetadata = fun(Prop) ->
- Prop#{status => status_schema(),
- node_status => per_node_schema(status, status_schema()),
- metrics => metrics_schema(),
- node_metrics => per_node_schema(metrics, metrics_schema()),
- id => #{type => string, example => "http:my_http_bridge"},
- bridge_type => #{type => string, enum => ?TYPES},
- node => node_schema()
- }
- end,
- more_props_resp_schema(AddMetadata).
-
-more_props_resp_schema(AddMetadata) ->
- #{'oneOf' := Schema} = req_schema(),
- Schema1 = [S#{properties => AddMetadata(Prop)}
- || S = #{properties := Prop} <- Schema],
- #{'oneOf' => Schema1}.
+namespace() -> "bridge".
api_spec() ->
- {bridge_apis(), []}.
+ emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
-bridge_apis() ->
- [list_all_bridges_api(), crud_bridges_apis(), operation_apis()].
+paths() -> ["/bridges", "/bridges/:id", "/nodes/:node/bridges/:id/operation/:operation"].
-list_all_bridges_api() ->
- ReqSchema = more_props_resp_schema(fun(Prop) ->
- Prop#{id => #{type => string, required => true}}
- end),
- RespSchema = resp_schema(),
- Metadata = #{
+error_schema(Code, Message) ->
+ [ {code, mk(string(), #{example => Code})}
+ , {message, mk(string(), #{example => Message})}
+ ].
+
+get_response_body_schema() ->
+ emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(),
+ bridge_info_examples(get)).
+
+param_path_node() ->
+ path_param(node, binary(), atom_to_binary(node(), utf8)).
+
+param_path_operation() ->
+ path_param(operation, enum([start, stop, restart]), <<"start">>).
+
+param_path_id() ->
+ path_param(id, binary(), <<"http:my_http_bridge">>).
+
+path_param(Name, Type, Example) ->
+ {Name, mk(Type,
+ #{ in => path
+ , required => true
+ , example => Example
+ })}.
+
+bridge_info_array_example(Method) ->
+ [Config || #{value := Config} <- maps:values(bridge_info_examples(Method))].
+
+bridge_info_examples(Method) ->
+ maps:merge(conn_bridge_examples(Method), #{
+ <<"http_bridge">> => #{
+ summary => <<"HTTP Bridge">>,
+ value => info_example(http, awesome, Method)
+ }
+ }).
+
+conn_bridge_examples(Method) ->
+ lists:foldl(fun(Type, Acc) ->
+ SType = atom_to_list(Type),
+ KeyIngress = bin(SType ++ "_ingress"),
+ KeyEgress = bin(SType ++ "_egress"),
+ maps:merge(Acc, #{
+ KeyIngress => #{
+ summary => bin(string:uppercase(SType) ++ " Ingress Bridge"),
+ value => info_example(Type, ingress, Method)
+ },
+ KeyEgress => #{
+ summary => bin(string:uppercase(SType) ++ " Egress Bridge"),
+ value => info_example(Type, egress, Method)
+ }
+ })
+ end, #{}, ?CONN_TYPES).
+
+info_example(Type, Direction, Method) ->
+ maps:merge(info_example_basic(Type, Direction),
+ method_example(Type, Direction, Method)).
+
+method_example(Type, Direction, get) ->
+ SType = atom_to_list(Type),
+ SDir = atom_to_list(Direction),
+ SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
+ #{
+ id => bin(SType ++ ":" ++ SName),
+ type => bin(SType),
+ name => bin(SName)
+ };
+method_example(Type, Direction, post) ->
+ SType = atom_to_list(Type),
+ SDir = atom_to_list(Direction),
+ SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
+ #{
+ type => bin(SType),
+ name => bin(SName)
+ };
+method_example(_Type, _Direction, put) ->
+ #{}.
+
+info_example_basic(http, _) ->
+ #{
+ url => <<"http://localhost:9901/messages/${topic}">>,
+ request_timeout => <<"30s">>,
+ connect_timeout => <<"30s">>,
+ max_retries => 3,
+ retry_interval => <<"10s">>,
+ pool_type => <<"random">>,
+ pool_size => 4,
+ enable_pipelining => true,
+ ssl => #{enable => false},
+ from_local_topic => <<"emqx_http/#">>,
+ method => post,
+ body => <<"${payload}">>
+ };
+info_example_basic(mqtt, ingress) ->
+ #{
+ connector => <<"mqtt:my_mqtt_connector">>,
+ direction => ingress,
+ from_remote_topic => <<"aws/#">>,
+ subscribe_qos => 1,
+ to_local_topic => <<"from_aws/${topic}">>,
+ payload => <<"${payload}">>,
+ qos => <<"${qos}">>,
+ retain => <<"${retain}">>
+ };
+info_example_basic(mqtt, egress) ->
+ #{
+ connector => <<"mqtt:my_mqtt_connector">>,
+ direction => egress,
+ from_local_topic => <<"emqx/#">>,
+ to_remote_topic => <<"from_emqx/${topic}">>,
+ payload => <<"${payload}">>,
+ qos => 1,
+ retain => false
+ }.
+
+schema("/bridges") ->
+ #{
+ operationId => '/bridges',
get => #{
+ tags => [<<"bridges">>],
+ summary => <<"List Bridges">>,
description => <<"List all created bridges">>,
responses => #{
- <<"200">> => emqx_mgmt_util:array_schema(resp_schema(),
- <<"A list of the bridges">>)
+ 200 => emqx_dashboard_swagger:schema_with_example(
+ array(emqx_bridge_schema:get_response()),
+ bridge_info_array_example(get))
}
},
post => #{
+ tags => [<<"bridges">>],
+ summary => <<"Create Bridge">>,
description => <<"Create a new bridge">>,
- 'requestBody' => emqx_mgmt_util:schema(ReqSchema),
+ requestBody => emqx_dashboard_swagger:schema_with_examples(
+ emqx_bridge_schema:post_request(),
+ bridge_info_examples(post)),
responses => #{
- <<"201">> => emqx_mgmt_util:schema(RespSchema, <<"Bridge created">>),
- <<"400">> => emqx_mgmt_util:error_schema(<<"Create bridge failed">>,
- ['UPDATE_FAILED'])
+ 201 => get_response_body_schema(),
+ 400 => error_schema('BAD_ARG', "Create bridge failed")
}
}
- },
- {"/bridges/", Metadata, list_create_bridges_in_cluster}.
+ };
-crud_bridges_apis() ->
- ReqSchema = req_schema(),
- RespSchema = resp_schema(),
- Metadata = #{
+schema("/bridges/:id") ->
+ #{
+ operationId => '/bridges/:id',
get => #{
+ tags => [<<"bridges">>],
+ summary => <<"Get Bridge">>,
description => <<"Get a bridge by Id">>,
parameters => [param_path_id()],
responses => #{
- <<"200">> => emqx_mgmt_util:array_schema(RespSchema,
- <<"The details of the bridge">>),
- <<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND'])
+ 200 => get_response_body_schema(),
+ 404 => error_schema('NOT_FOUND', "Bridge not found")
}
},
put => #{
+ tags => [<<"bridges">>],
+ summary => <<"Update Bridge">>,
description => <<"Update a bridge">>,
parameters => [param_path_id()],
- 'requestBody' => emqx_mgmt_util:schema(ReqSchema),
+ requestBody => emqx_dashboard_swagger:schema_with_examples(
+ emqx_bridge_schema:put_request(),
+ bridge_info_examples(put)),
responses => #{
- <<"200">> => emqx_mgmt_util:array_schema(RespSchema, <<"Bridge updated">>),
- <<"400">> => emqx_mgmt_util:error_schema(<<"Update bridge failed">>,
- ['UPDATE_FAILED'])
+ 200 => get_response_body_schema(),
+ 400 => error_schema('BAD_ARG', "Update bridge failed")
}
},
delete => #{
+ tags => [<<"bridges">>],
+ summary => <<"Delete Bridge">>,
description => <<"Delete a bridge">>,
parameters => [param_path_id()],
responses => #{
- <<"204">> => emqx_mgmt_util:schema(<<"Bridge deleted">>),
- <<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND'])
+ 204 => <<"Bridge deleted">>
}
}
- },
- {"/bridges/:id", Metadata, crud_bridges_in_cluster}.
+ };
-operation_apis() ->
- Metadata = #{
+schema("/nodes/:node/bridges/:id/operation/:operation") ->
+ #{
+ operationId => '/nodes/:node/bridges/:id/operation/:operation',
post => #{
+ tags => [<<"bridges">>],
+ summary => <<"Start/Stop/Restart Bridge">>,
description => <<"Start/Stop/Restart bridges on a specific node">>,
parameters => [
param_path_node(),
param_path_id(),
- param_path_operation()],
+ param_path_operation()
+ ],
responses => #{
- <<"500">> => emqx_mgmt_util:error_schema(<<"Operation Failed">>,
- ['INTERNAL_ERROR']),
- <<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}},
- {"/nodes/:node/bridges/:id/operation/:operation", Metadata, manage_bridges}.
-
-param_path_node() ->
- #{
- name => node,
- in => path,
- schema => #{type => string},
- required => true,
- example => node()
+ 500 => error_schema('INTERNAL_ERROR', "Operation Failed"),
+ 200 => <<"Operation success">>
+ }
+ }
}.
-param_path_id() ->
- #{
- name => id,
- in => path,
- schema => #{type => string},
- required => true
- }.
-
-param_path_operation()->
- #{
- name => operation,
- in => path,
- required => true,
- schema => #{
- type => string,
- enum => [start, stop, restart]},
- example => restart
- }.
-
-list_create_bridges_in_cluster(post, #{body := #{<<"id">> := Id} = Conf}) ->
- ?TRY_PARSE_ID(Id,
- case emqx_bridge:lookup(BridgeType, BridgeName) of
- {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
- {error, not_found} ->
- case ensure_bridge(BridgeType, BridgeName, maps:remove(<<"id">>, Conf)) of
- ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 201);
- {error, Error} -> {400, Error}
- end
- end);
-list_create_bridges_in_cluster(get, _Params) ->
+'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) ->
+ BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()),
+ case emqx_bridge:lookup(BridgeType, BridgeName) of
+ {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
+ {error, not_found} ->
+ case ensure_bridge_created(BridgeType, BridgeName, Conf) of
+ ok -> lookup_from_all_nodes(BridgeType, BridgeName, 201);
+ {error, Error} -> {400, Error}
+ end
+ end;
+'/bridges'(get, _Params) ->
{200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
list_local_bridges(Node) when Node =:= node() ->
@@ -238,22 +284,22 @@ list_local_bridges(Node) when Node =:= node() ->
list_local_bridges(Node) ->
rpc_call(Node, list_local_bridges, [Node]).
-crud_bridges_in_cluster(get, #{bindings := #{id := Id}}) ->
- ?TRY_PARSE_ID(Id, lookup_from_all_nodes(Id, BridgeType, BridgeName, 200));
+'/bridges/:id'(get, #{bindings := #{id := Id}}) ->
+ ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
-crud_bridges_in_cluster(put, #{bindings := #{id := Id}, body := Conf}) ->
+'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf}) ->
?TRY_PARSE_ID(Id,
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} ->
- case ensure_bridge(BridgeType, BridgeName, Conf) of
- ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 200);
+ case ensure_bridge_created(BridgeType, BridgeName, Conf) of
+ ok -> lookup_from_all_nodes(BridgeType, BridgeName, 200);
{error, Error} -> {400, Error}
end;
{error, not_found} ->
{404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}}
end);
-crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) ->
+'/bridges/:id'(delete, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id,
case emqx_conf:remove(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
#{override_to => cluster}) of
@@ -262,12 +308,12 @@ crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) ->
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
end).
-lookup_from_all_nodes(Id, BridgeType, BridgeName, SuccCode) ->
+lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of
{ok, [{ok, _} | _] = Results} ->
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
{ok, [{error, not_found} | _]} ->
- {404, error_msg('NOT_FOUND', <<"not_found: ", Id/binary>>)};
+ {404, error_msg('NOT_FOUND', <<"not_found">>)};
{error, ErrL} ->
{500, error_msg('UNKNOWN_ERROR', ErrL)}
end.
@@ -278,7 +324,8 @@ lookup_from_local_node(BridgeType, BridgeName) ->
Error -> Error
end.
-manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) ->
+'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings :=
+ #{node := Node, id := Id, operation := Op}}) ->
OperFun =
fun (<<"start">>) -> start;
(<<"stop">>) -> stop;
@@ -292,9 +339,10 @@ manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}})
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
end).
-ensure_bridge(BridgeType, BridgeName, Conf) ->
- case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
- #{override_to => cluster}) of
+ensure_bridge_created(BridgeType, BridgeName, Conf) ->
+ Conf1 = maps:without([<<"type">>, <<"name">>], Conf),
+ case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
+ Conf1, #{override_to => cluster}) of
{ok, _} -> ok;
{error, Reason} ->
{error, error_msg('BAD_ARG', Reason)}
@@ -351,7 +399,7 @@ format_resp(#{id := Id, raw_config := RawConf,
RawConf#{
id => Id,
node => node(),
- bridge_type => emqx_bridge:bridge_type(Mod),
+ type => emqx_bridge:bridge_type(Mod),
status => IsConnected(Status),
metrics => Metrics
}.
@@ -379,3 +427,10 @@ error_msg(Code, Msg) when is_binary(Msg) ->
#{code => Code, message => Msg};
error_msg(Code, Msg) ->
#{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.
+
+bin(S) when is_atom(S) ->
+ atom_to_binary(S, utf8);
+bin(S) when is_list(S) ->
+ list_to_binary(S);
+bin(S) when is_binary(S) ->
+ S.
diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
new file mode 100644
index 000000000..2bef474bd
--- /dev/null
+++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
@@ -0,0 +1,95 @@
+-module(emqx_bridge_http_schema).
+
+-include_lib("typerefl/include/types.hrl").
+
+-import(hoconsc, [mk/2, enum/1]).
+
+-export([roots/0, fields/1]).
+
+%%======================================================================================
+%% Hocon Schema Definitions
+roots() -> [].
+
+fields("bridge") ->
+ basic_config() ++
+ [ {url, mk(binary(),
+ #{ nullable => false
+ , desc =>"""
+The URL of the HTTP Bridge.
+Template with variables is allowed in the path, but variables cannot be used in the scheme, host,
+or port part.
+For example, http://localhost:9901/${topic}
is allowed, but
+ http://${host}:9901/message
or http://localhost:${port}/message
+is not allowed.
+"""
+ })}
+ , {from_local_topic, mk(binary(),
+ #{ desc =>"""
+The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
+match the from_local_topic will be forwarded.
+NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches
+from_local_topic will be forwarded.
+"""
+ })}
+ , {method, mk(method(),
+ #{ default => post
+ , desc =>"""
+The method of the HTTP request. All the available methods are: post, put, get, delete.
+Template with variables is allowed.
+"""
+ })}
+ , {headers, mk(map(),
+ #{ default => #{
+ <<"accept">> => <<"application/json">>,
+ <<"cache-control">> => <<"no-cache">>,
+ <<"connection">> => <<"keep-alive">>,
+ <<"content-type">> => <<"application/json">>,
+ <<"keep-alive">> => <<"timeout=5">>}
+ , desc =>"""
+The headers of the HTTP request.
+Template with variables is allowed.
+"""
+ })
+ }
+ , {body, mk(binary(),
+ #{ default => <<"${payload}">>
+ , desc =>"""
+The body of the HTTP request.
+Template with variables is allowed.
+"""
+ })}
+ , {request_timeout, mk(emqx_schema:duration_ms(),
+ #{ default => <<"30s">>
+ , desc =>"""
+How long will the HTTP request timeout.
+"""
+ })}
+ ];
+
+fields("post") ->
+ [ type_field()
+ , name_field()
+ ] ++ fields("bridge");
+
+fields("put") ->
+ fields("bridge");
+
+fields("get") ->
+ [ id_field()
+ ] ++ fields("post").
+
+basic_config() ->
+ proplists:delete(base_url, emqx_connector_http:fields(config)).
+
+%%======================================================================================
+id_field() ->
+ {id, mk(binary(), #{desc => "The Bridge Id", example => "http:my_http_bridge"})}.
+
+type_field() ->
+ {type, mk(http, #{desc => "The Bridge Type"})}.
+
+name_field() ->
+ {name, mk(binary(), #{desc => "The Bridge Name"})}.
+
+method() ->
+ enum([post, put, get, delete]).
diff --git a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl
new file mode 100644
index 000000000..d2cf6b1a8
--- /dev/null
+++ b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl
@@ -0,0 +1,62 @@
+-module(emqx_bridge_mqtt_schema).
+
+-include_lib("typerefl/include/types.hrl").
+
+-import(hoconsc, [mk/2]).
+
+-export([roots/0, fields/1]).
+
+%%======================================================================================
+%% Hocon Schema Definitions
+roots() -> [].
+
+fields("ingress") ->
+ [ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
+ , emqx_bridge_schema:connector_name()
+ ] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
+
+fields("egress") ->
+ [ direction(egress, emqx_connector_mqtt_schema:egress_desc())
+ , emqx_bridge_schema:connector_name()
+ ] ++ emqx_connector_mqtt_schema:fields("egress");
+
+fields("post_ingress") ->
+ [ type_field()
+ , name_field()
+ ] ++ fields("ingress");
+fields("post_egress") ->
+ [ type_field()
+ , name_field()
+ ] ++ fields("egress");
+
+fields("put_ingress") ->
+ fields("ingress");
+fields("put_egress") ->
+ fields("egress");
+
+fields("get_ingress") ->
+ [ id_field()
+ ] ++ fields("post_ingress");
+fields("get_egress") ->
+ [ id_field()
+ ] ++ fields("post_egress").
+
+%%======================================================================================
+direction(Dir, Desc) ->
+ {direction, mk(Dir,
+ #{ nullable => false
+ , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
"
+ ++ Desc
+ })}.
+
+id_field() ->
+ {id, mk(binary(), #{desc => "The Bridge Id", example => "mqtt:my_mqtt_bridge"})}.
+
+type_field() ->
+ {type, mk(mqtt, #{desc => "The Bridge Type"})}.
+
+name_field() ->
+ {name, mk(binary(),
+ #{ desc => "The Bridge Name"
+ , example => "some_bridge_name"
+ })}.
diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl
index 3a3151ef0..2038e6c04 100644
--- a/apps/emqx_bridge/src/emqx_bridge_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl
@@ -2,105 +2,27 @@
-include_lib("typerefl/include/types.hrl").
+-import(hoconsc, [mk/2, ref/2]).
+
-export([roots/0, fields/1]).
+-export([ get_response/0
+ , put_request/0
+ , post_request/0
+ ]).
+
+-export([ connector_name/0
+ ]).
+
%%======================================================================================
%% Hocon Schema Definitions
-roots() -> [bridges].
+-define(CONN_TYPES, [mqtt]).
-fields(bridges) ->
- [ {mqtt,
- sc(hoconsc:map(name, hoconsc:union([ ref("ingress_mqtt_bridge")
- , ref("egress_mqtt_bridge")
- ])),
- #{ desc => "MQTT bridges"
- })}
- , {http,
- sc(hoconsc:map(name, ref("http_bridge")),
- #{ desc => "HTTP bridges"
- })}
- ];
-
-fields("ingress_mqtt_bridge") ->
- [ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
- , connector_name()
- ] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
-
-fields("egress_mqtt_bridge") ->
- [ direction(egress, emqx_connector_mqtt_schema:egress_desc())
- , connector_name()
- ] ++ emqx_connector_mqtt_schema:fields("egress");
-
-fields("http_bridge") ->
- basic_config_http() ++
- [ {url,
- sc(binary(),
- #{ nullable => false
- , desc =>"""
-The URL of the HTTP Bridge.
-Template with variables is allowed in the path, but variables cannot be used in the scheme, host,
-or port part.
-For example, http://localhost:9901/${topic}
is allowed, but
- http://${host}:9901/message
or http://localhost:${port}/message
-is not allowed.
-"""
- })}
- , {from_local_topic,
- sc(binary(),
- #{ desc =>"""
-The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
-match the from_local_topic will be forwarded.
-NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches
-from_local_topic will be forwarded.
-"""
- })}
- , {method,
- sc(method(),
- #{ default => post
- , desc =>"""
-The method of the HTTP request. All the available methods are: post, put, get, delete.
-Template with variables is allowed.
-"""
- })}
- , {headers,
- sc(map(),
- #{ default => #{
- <<"accept">> => <<"application/json">>,
- <<"cache-control">> => <<"no-cache">>,
- <<"connection">> => <<"keep-alive">>,
- <<"content-type">> => <<"application/json">>,
- <<"keep-alive">> => <<"timeout=5">>}
- , desc =>"""
-The headers of the HTTP request.
-Template with variables is allowed.
-"""
- })
- }
- , {body,
- sc(binary(),
- #{ default => <<"${payload}">>
- , desc =>"""
-The body of the HTTP request.
-Template with variables is allowed.
-"""
- })}
- , {request_timeout,
- sc(emqx_schema:duration_ms(),
- #{ default => <<"30s">>
- , desc =>"""
-How long will the HTTP request timeout.
-"""
- })}
- ].
-
-direction(Dir, Desc) ->
- {direction,
- sc(Dir,
- #{ nullable => false
- , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
" ++
- Desc
- })}.
+%%======================================================================================
+%% For HTTP APIs
+get_response() ->
+ http_schema("get").
connector_name() ->
{connector,
@@ -108,17 +30,35 @@ connector_name() ->
#{ nullable => false
, desc =>"""
The connector name to be used for this bridge.
-Connectors are configured as 'connectors.type.name',
+Connectors are configured as 'connectors.{type}.{name}',
for example 'connectors.http.mybridge'.
"""
})}.
-basic_config_http() ->
- proplists:delete(base_url, emqx_connector_http:fields(config)).
+put_request() ->
+ http_schema("put").
-method() ->
- hoconsc:enum([post, put, get, delete]).
+post_request() ->
+ http_schema("post").
-sc(Type, Meta) -> hoconsc:mk(Type, Meta).
+http_schema(Method) ->
+ Schemas = lists:flatmap(fun(Type) ->
+ [ref(schema_mod(Type), Method ++ "_ingress"),
+ ref(schema_mod(Type), Method ++ "_egress")]
+ end, ?CONN_TYPES),
+ hoconsc:union([ref(emqx_bridge_http_schema, Method)
+ | Schemas]).
-ref(Field) -> hoconsc:ref(?MODULE, Field).
+%%======================================================================================
+%% For config files
+roots() -> [bridges].
+
+fields(bridges) ->
+ [{http, mk(hoconsc:map(name, ref(emqx_bridge_http_schema, "bridge")), #{})}]
+ ++ [{T, mk(hoconsc:map(name, hoconsc:union([
+ ref(schema_mod(T), "ingress"),
+ ref(schema_mod(T), "egress")
+ ])), #{})} || T <- ?CONN_TYPES].
+
+schema_mod(Type) ->
+ list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])).
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index bc865906f..1510f439e 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -62,7 +62,7 @@ post_request_body_schema() ->
connector_info(post_req), connector_info_examples()).
get_response_body_schema() ->
- emqx_dashboard_swagger:schema_with_example(
+ emqx_dashboard_swagger:schema_with_examples(
connector_info(), connector_info_examples()).
connector_info() ->
diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl
index 190456262..9d11d7ac0 100644
--- a/apps/emqx_connector/src/emqx_connector_mqtt.erl
+++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl
@@ -100,7 +100,7 @@ on_start(InstId, Conf) ->
BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{
name => InstanceId,
- clientid => clientid(maps:get(clientid, Conf, InstanceId)),
+ clientid => clientid(maps:get(clientid, Conf, InstId)),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)),
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
},
@@ -190,4 +190,4 @@ basic_config(#{
}.
clientid(Id) ->
- unicode:characters_to_binary([Id, ":", atom_to_list(node())], utf8).
+ iolist_to_binary([Id, ":", atom_to_list(node())]).
From e1794fbce6ab8a95f689766c0e827e7e981d2d6b Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Mon, 6 Dec 2021 14:53:33 +0800
Subject: [PATCH 06/10] fix(connector): create different schema for
POST,PUT,GET methods
---
apps/emqx_bridge/src/emqx_bridge_api.erl | 6 +-
.../emqx_connector/src/emqx_connector_api.erl | 99 ++++++++++++-------
.../src/emqx_connector_mqtt.erl | 22 ++++-
.../src/emqx_connector_schema.erl | 46 ++++++---
.../test/emqx_connector_api_SUITE.erl | 62 +++++++++---
5 files changed, 163 insertions(+), 72 deletions(-)
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index a5c24be9f..95b9c8759 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -394,12 +394,14 @@ aggregate_metrics(AllMetrics) ->
end, InitMetrics, AllMetrics).
format_resp(#{id := Id, raw_config := RawConf,
- resource_data := #{mod := Mod, status := Status, metrics := Metrics}}) ->
+ resource_data := #{status := Status, metrics := Metrics}}) ->
+ {Type, Name} = emqx_bridge:parse_bridge_id(Id),
IsConnected = fun(started) -> connected; (_) -> disconnected end,
RawConf#{
id => Id,
+ type => Type,
+ name => Name,
node => node(),
- type => emqx_bridge:bridge_type(Mod),
status => IsConnected(Status),
metrics => Metrics
}.
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index 1510f439e..fe28dde9d 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -30,6 +30,8 @@
%% API callbacks
-export(['/connectors_test'/2, '/connectors'/2, '/connectors/:id'/2]).
+-define(CONN_TYPES, [mqtt]).
+
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_connector:parse_connector_id(Id) of
{ConnType, ConnName} ->
@@ -55,43 +57,54 @@ error_schema(Code, Message) ->
put_request_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
- connector_info(put_req), connector_info_examples()).
+ emqx_connector_schema:put_request(), connector_info_examples(put)).
post_request_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
- connector_info(post_req), connector_info_examples()).
+ emqx_connector_schema:post_request(), connector_info_examples(post)).
get_response_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
- connector_info(), connector_info_examples()).
+ emqx_connector_schema:get_response(), connector_info_examples(get)).
-connector_info() ->
- connector_info(resp).
+connector_info_array_example(Method) ->
+ [Config || #{value := Config} <- maps:values(connector_info_examples(Method))].
-connector_info(resp) ->
- hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_info")
- ]);
-connector_info(put_req) ->
- hoconsc:union([ ref(emqx_connector_mqtt_schema, "connector")
- ]);
-connector_info(post_req) ->
- hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector")
- ]).
+connector_info_examples(Method) ->
+ lists:foldl(fun(Type, Acc) ->
+ SType = atom_to_list(Type),
+ maps:merge(Acc, #{
+ Type => #{
+ summary => bin(string:uppercase(SType) ++ " Connector"),
+ value => info_example(Type, Method)
+ }
+ })
+ end, #{}, ?CONN_TYPES).
-connector_info_array_example() ->
- [Config || #{value := Config} <- maps:values(connector_info_examples())].
+info_example(Type, Method) ->
+ maps:merge(info_example_basic(Type),
+ method_example(Type, Method)).
-connector_info_examples() ->
+method_example(Type, get) ->
+ SType = atom_to_list(Type),
+ SName = "my_" ++ SType ++ "_connector",
#{
- mqtt => #{
- summary => <<"MQTT Bridge">>,
- value => mqtt_info_example()
- }
- }.
-
-mqtt_info_example() ->
+ id => bin(SType ++ ":" ++ SName),
+ type => bin(SType),
+ name => bin(SName)
+ };
+method_example(Type, post) ->
+ SType = atom_to_list(Type),
+ SName = "my_" ++ SType ++ "_connector",
+ #{
+ type => bin(SType),
+ name => bin(SName)
+ };
+method_example(_Type, put) ->
+ #{}.
+
+info_example_basic(mqtt) ->
#{
- type => <<"mqtt">>,
server => <<"127.0.0.1:1883">>,
reconnect_interval => <<"30s">>,
proto_ver => <<"v4">>,
@@ -136,8 +149,8 @@ schema("/connectors") ->
summary => <<"List connectors">>,
responses => #{
200 => emqx_dashboard_swagger:schema_with_example(
- array(connector_info()),
- connector_info_array_example())
+ array(emqx_connector_schema:get_response()),
+ connector_info_array_example(get))
}
},
post => #{
@@ -198,17 +211,20 @@ schema("/connectors/:id") ->
'/connectors'(get, _Request) ->
{200, emqx_connector:list()};
-'/connectors'(post, #{body := #{<<"id">> := Id} = Params}) ->
- ?TRY_PARSE_ID(Id,
- case emqx_connector:lookup(ConnType, ConnName) of
- {ok, _} ->
- {400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
- {error, not_found} ->
- case emqx_connector:update(ConnType, ConnName, maps:remove(<<"id">>, Params)) of
- {ok, #{raw_config := RawConf}} -> {201, RawConf#{<<"id">> => Id}};
- {error, Error} -> {400, error_msg('BAD_ARG', Error)}
- end
- end).
+'/connectors'(post, #{body := #{<<"type">> := ConnType} = Params}) ->
+ ConnName = maps:get(<<"name">>, Params, emqx_misc:gen_id()),
+ case emqx_connector:lookup(ConnType, ConnName) of
+ {ok, _} ->
+ {400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
+ {error, not_found} ->
+ case emqx_connector:update(ConnType, ConnName,
+ maps:without([<<"type">>, <<"name">>], Params)) of
+ {ok, #{raw_config := RawConf}} ->
+ {201, RawConf#{<<"id">> =>
+ emqx_connector:connector_id(ConnType, ConnName)}};
+ {error, Error} -> {400, error_msg('BAD_ARG', Error)}
+ end
+ end.
'/connectors/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id,
@@ -246,3 +262,10 @@ error_msg(Code, Msg) when is_binary(Msg) ->
#{code => Code, message => Msg};
error_msg(Code, Msg) ->
#{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.
+
+bin(S) when is_atom(S) ->
+ atom_to_binary(S, utf8);
+bin(S) when is_list(S) ->
+ list_to_binary(S);
+bin(S) when is_binary(S) ->
+ S.
diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl
index 9d11d7ac0..2cce0d195 100644
--- a/apps/emqx_connector/src/emqx_connector_mqtt.erl
+++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl
@@ -40,6 +40,8 @@
-behaviour(hocon_schema).
+-import(hoconsc, [mk/2]).
+
-export([ roots/0
, fields/1]).
@@ -49,7 +51,25 @@ roots() ->
fields("config").
fields("config") ->
- emqx_connector_mqtt_schema:fields("config").
+ emqx_connector_mqtt_schema:fields("config");
+
+fields("get") ->
+ [{id, mk(binary(),
+ #{ desc => "The connector Id"
+ , example => <<"mqtt:my_mqtt_connector">>
+ })}]
+ ++ fields("post");
+
+fields("put") ->
+ emqx_connector_mqtt_schema:fields("connector");
+
+fields("post") ->
+ [ {type, mk(mqtt, #{desc => "The Connector Type"})}
+ , {name, mk(binary(),
+ #{ desc => "The Connector Name"
+ , example => <<"my_mqtt_connector">>
+ })}
+ ] ++ fields("put").
%% ===================================================================
%% supervisor APIs
diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl
index 518d4e62d..c386a829f 100644
--- a/apps/emqx_connector/src/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/emqx_connector_schema.erl
@@ -4,8 +4,33 @@
-include_lib("typerefl/include/types.hrl").
+-import(hoconsc, [mk/2, ref/2]).
+
-export([roots/0, fields/1]).
+-export([ get_response/0
+ , put_request/0
+ , post_request/0
+ ]).
+
+-define(CONN_TYPES, [mqtt]).
+
+%%======================================================================================
+%% For HTTP APIs
+
+get_response() ->
+ http_schema("get").
+
+put_request() ->
+ http_schema("put").
+
+post_request() ->
+ http_schema("post").
+
+http_schema(Method) ->
+ Schemas = [ref(schema_mod(Type), Method) || Type <- ?CONN_TYPES],
+ hoconsc:union(Schemas).
+
%%======================================================================================
%% Hocon Schema Definitions
@@ -14,23 +39,12 @@ roots() -> ["connectors"].
fields(connectors) -> fields("connectors");
fields("connectors") ->
[ {mqtt,
- sc(hoconsc:map(name,
- hoconsc:union([ ref("mqtt_connector")
+ mk(hoconsc:map(name,
+ hoconsc:union([ ref(emqx_connector_mqtt_schema, "connector")
])),
#{ desc => "MQTT bridges"
})}
- ];
+ ].
-fields("mqtt_connector") ->
- [ {type, sc(mqtt, #{desc => "The Connector Type"})}
- %, {name, sc(binary(), #{desc => "The Connector Name"})}
- ]
- ++ emqx_connector_mqtt_schema:fields("connector");
-
-fields("mqtt_connector_info") ->
- [{id, sc(binary(), #{desc => "The connector Id", example => "mqtt:foo"})}]
- ++ fields("mqtt_connector").
-
-sc(Type, Meta) -> hoconsc:mk(Type, Meta).
-
-ref(Field) -> hoconsc:ref(?MODULE, Field).
+schema_mod(Type) ->
+ list_to_atom(lists:concat(["emqx_connector_", Type])).
diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
index 96f530563..bbac76674 100644
--- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
+++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
@@ -24,7 +24,11 @@
-define(CONF_DEFAULT, <<"connectors: {}">>).
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
+-define(CONNECTR_TYPE, <<"mqtt">>).
+-define(CONNECTR_NAME, <<"test_connector">>).
-define(CONNECTR_ID, <<"mqtt:test_connector">>).
+-define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>).
+-define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>).
-define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>).
-define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>).
-define(MQTT_CONNECOTR(Username),
@@ -63,8 +67,8 @@
-define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX),
#{<<"matched">> := MATCH, <<"success">> := SUCC,
- <<"failed">> := FAILED, <<"speed">> := SPEED,
- <<"speed_last5m">> := SPEED5M, <<"speed_max">> := SPEEDMAX}).
+ <<"failed">> := FAILED, <<"rate">> := SPEED,
+ <<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}).
all() ->
emqx_common_test_helpers:all(?MODULE).
@@ -115,7 +119,9 @@ t_mqtt_crud_apis(_) ->
%% POST /connectors/ will create a connector
User1 = <<"user1">>,
{ok, 201, Connector} = request(post, uri(["connectors"]),
- ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
+ ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE
+ , <<"name">> => ?CONNECTR_NAME
+ }),
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
@@ -128,7 +134,9 @@ t_mqtt_crud_apis(_) ->
%% create a again returns an error
{ok, 400, RetMsg} = request(post, uri(["connectors"]),
- ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
+ ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE
+ , <<"name">> => ?CONNECTR_NAME
+ }),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"connector already exists">>
@@ -187,7 +195,9 @@ t_mqtt_conn_bridge_ingress(_) ->
%% then we add a mqtt connector, using POST
User1 = <<"user1">>,
{ok, 201, Connector} = request(post, uri(["connectors"]),
- ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
+ ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE
+ , <<"name">> => ?CONNECTR_NAME
+ }),
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
@@ -201,11 +211,14 @@ t_mqtt_conn_bridge_ingress(_) ->
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
- ?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_INGRESS}),
+ ?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{
+ <<"type">> => ?CONNECTR_TYPE,
+ <<"name">> => ?BRIDGE_NAME_INGRESS
+ }),
%ct:pal("---bridge: ~p", [Bridge]),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS
- , <<"bridge_type">> := <<"mqtt">>
+ , <<"type">> := <<"mqtt">>
, <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID
}, jsx:decode(Bridge)),
@@ -250,7 +263,9 @@ t_mqtt_conn_bridge_egress(_) ->
%% then we add a mqtt connector, using POST
User1 = <<"user1">>,
{ok, 201, Connector} = request(post, uri(["connectors"]),
- ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
+ ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE
+ , <<"name">> => ?CONNECTR_NAME
+ }),
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
@@ -264,11 +279,15 @@ t_mqtt_conn_bridge_egress(_) ->
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
- ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}),
+ ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{
+ <<"type">> => ?CONNECTR_TYPE,
+ <<"name">> => ?BRIDGE_NAME_EGRESS
+ }),
%ct:pal("---bridge: ~p", [Bridge]),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
- , <<"bridge_type">> := <<"mqtt">>
+ , <<"type">> := ?CONNECTR_TYPE
+ , <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID
}, jsx:decode(Bridge)),
@@ -322,7 +341,10 @@ t_mqtt_conn_update(_) ->
%% then we add a mqtt connector, using POST
{ok, 201, Connector} = request(post, uri(["connectors"]),
- ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"id">> => ?CONNECTR_ID}),
+ ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)
+ #{ <<"type">> => ?CONNECTR_TYPE
+ , <<"name">> => ?CONNECTR_NAME
+ }),
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
@@ -332,9 +354,13 @@ t_mqtt_conn_update(_) ->
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
- ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}),
+ ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{
+ <<"type">> => ?CONNECTR_TYPE,
+ <<"name">> => ?BRIDGE_NAME_EGRESS
+ }),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
- , <<"bridge_type">> := <<"mqtt">>
+ , <<"type">> := <<"mqtt">>
+ , <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID
}, jsx:decode(Bridge)),
@@ -358,9 +384,15 @@ t_mqtt_conn_testing(_) ->
%% APIs for testing the connectivity
%% then we add a mqtt connector, using POST
{ok, 200, <<>>} = request(post, uri(["connectors_test"]),
- ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"bridge_type">> => <<"mqtt">>}),
+ ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{
+ <<"type">> => ?CONNECTR_TYPE,
+ <<"name">> => ?BRIDGE_NAME_EGRESS
+ }),
{ok, 400, _} = request(post, uri(["connectors_test"]),
- ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)#{<<"bridge_type">> => <<"mqtt">>}).
+ ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)#{
+ <<"type">> => ?CONNECTR_TYPE,
+ <<"name">> => ?BRIDGE_NAME_EGRESS
+ }).
%%--------------------------------------------------------------------
%% HTTP Request
From 9b4fe87ed0bcf09e4ce3e1cf85d7a469e1c7b7b6 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Mon, 6 Dec 2021 15:41:31 +0800
Subject: [PATCH 07/10] fix(bridges): update testcases for bridge APIs
---
.../test/emqx_bridge_api_SUITE.erl | 80 +++++++++++--------
1 file changed, 48 insertions(+), 32 deletions(-)
diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
index 23d4691f5..52c8a32de 100644
--- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
@@ -21,7 +21,9 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"bridges: {}">>).
--define(TEST_ID, <<"http:test_bridge">>).
+-define(BRIDGE_TYPE, <<"http">>).
+-define(BRIDGE_NAME, <<"test_bridge">>).
+-define(BRIDGE_ID, <<"http:test_bridge">>).
-define(URL(PORT, PATH), list_to_binary(
io_lib:format("http://localhost:~s/~s",
[integer_to_list(PORT), PATH]))).
@@ -134,11 +136,15 @@ t_http_crud_apis(_) ->
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
- ?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}),
+ ?HTTP_BRIDGE(URL1)#{
+ <<"type">> => ?BRIDGE_TYPE,
+ <<"name">> => ?BRIDGE_NAME
+ }),
%ct:pal("---bridge: ~p", [Bridge]),
- ?assertMatch(#{ <<"id">> := ?TEST_ID
- , <<"bridge_type">> := <<"http">>
+ ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+ , <<"type">> := ?BRIDGE_TYPE
+ , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
@@ -148,7 +154,10 @@ t_http_crud_apis(_) ->
%% create a again returns an error
{ok, 400, RetMsg} = request(post, uri(["bridges"]),
- ?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}),
+ ?HTTP_BRIDGE(URL1)#{
+ <<"type">> => ?BRIDGE_TYPE,
+ <<"name">> => ?BRIDGE_NAME
+ }),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"bridge already exists">>
@@ -156,10 +165,11 @@ t_http_crud_apis(_) ->
%% update the request-path of the bridge
URL2 = ?URL(Port, "path2"),
- {ok, 200, Bridge2} = request(put, uri(["bridges", ?TEST_ID]),
+ {ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]),
?HTTP_BRIDGE(URL2)),
- ?assertMatch(#{ <<"id">> := ?TEST_ID
- , <<"bridge_type">> := <<"http">>
+ ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+ , <<"type">> := ?BRIDGE_TYPE
+ , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
@@ -169,8 +179,9 @@ t_http_crud_apis(_) ->
%% list all bridges again, assert Bridge2 is in it
{ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []),
- ?assertMatch([#{ <<"id">> := ?TEST_ID
- , <<"bridge_type">> := <<"http">>
+ ?assertMatch([#{ <<"id">> := ?BRIDGE_ID
+ , <<"type">> := ?BRIDGE_TYPE
+ , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
@@ -179,9 +190,10 @@ t_http_crud_apis(_) ->
}], jsx:decode(Bridge2Str)),
%% get the bridge by id
- {ok, 200, Bridge3Str} = request(get, uri(["bridges", ?TEST_ID]), []),
- ?assertMatch(#{ <<"id">> := ?TEST_ID
- , <<"bridge_type">> := <<"http">>
+ {ok, 200, Bridge3Str} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
+ ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+ , <<"type">> := ?BRIDGE_TYPE
+ , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
@@ -190,11 +202,11 @@ t_http_crud_apis(_) ->
}, jsx:decode(Bridge3Str)),
%% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", ?TEST_ID]), []),
+ {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% update a deleted bridge returns an error
- {ok, 404, ErrMsg2} = request(put, uri(["bridges", ?TEST_ID]),
+ {ok, 404, ErrMsg2} = request(put, uri(["bridges", ?BRIDGE_ID]),
?HTTP_BRIDGE(URL2)),
?assertMatch(
#{ <<"code">> := _
@@ -206,11 +218,15 @@ t_start_stop_bridges(_) ->
Port = start_http_server(fun handle_fun_200_ok/1),
URL1 = ?URL(Port, "abc"),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
- ?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}),
+ ?HTTP_BRIDGE(URL1)#{
+ <<"type">> => ?BRIDGE_TYPE,
+ <<"name">> => ?BRIDGE_NAME
+ }),
%ct:pal("the bridge ==== ~p", [Bridge]),
?assertMatch(
- #{ <<"id">> := ?TEST_ID
- , <<"bridge_type">> := <<"http">>
+ #{ <<"id">> := ?BRIDGE_ID
+ , <<"type">> := ?BRIDGE_TYPE
+ , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
@@ -219,42 +235,42 @@ t_start_stop_bridges(_) ->
}, jsx:decode(Bridge)),
%% stop it
{ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "stop"]),
+ uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "stop"]),
<<"">>),
- {ok, 200, Bridge2} = request(get, uri(["bridges", ?TEST_ID]), []),
- ?assertMatch(#{ <<"id">> := ?TEST_ID
+ {ok, 200, Bridge2} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
+ ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"disconnected">>
}, jsx:decode(Bridge2)),
%% start again
{ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "start"]),
+ uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "start"]),
<<"">>),
- {ok, 200, Bridge3} = request(get, uri(["bridges", ?TEST_ID]), []),
- ?assertMatch(#{ <<"id">> := ?TEST_ID
+ {ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
+ ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% restart an already started bridge
{ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "restart"]),
+ uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "restart"]),
<<"">>),
- {ok, 200, Bridge3} = request(get, uri(["bridges", ?TEST_ID]), []),
- ?assertMatch(#{ <<"id">> := ?TEST_ID
+ {ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
+ ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% stop it again
{ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "stop"]),
+ uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "stop"]),
<<"">>),
%% restart a stopped bridge
{ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "restart"]),
+ uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "restart"]),
<<"">>),
- {ok, 200, Bridge4} = request(get, uri(["bridges", ?TEST_ID]), []),
- ?assertMatch(#{ <<"id">> := ?TEST_ID
+ {ok, 200, Bridge4} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
+ ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge4)),
%% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", ?TEST_ID]), []),
+ {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
%%--------------------------------------------------------------------
From 6903997b9440d8429306e0a97e1fa84a1b2b7838 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Mon, 6 Dec 2021 18:44:27 +0800
Subject: [PATCH 08/10] feat(connector): add new option 'mode' to mqtt
connectors
---
apps/emqx_connector/etc/emqx_connector.conf | 2 +-
.../emqx_connector/src/emqx_connector_api.erl | 2 +-
.../src/emqx_connector_mqtt.erl | 3 +--
.../src/mqtt/emqx_connector_mqtt_schema.erl | 26 ++++++++++++++-----
4 files changed, 22 insertions(+), 11 deletions(-)
diff --git a/apps/emqx_connector/etc/emqx_connector.conf b/apps/emqx_connector/etc/emqx_connector.conf
index 06395ac94..8929598be 100644
--- a/apps/emqx_connector/etc/emqx_connector.conf
+++ b/apps/emqx_connector/etc/emqx_connector.conf
@@ -1,4 +1,5 @@
#connectors.mqtt.my_mqtt_connector {
+# mode = cluster_shareload
# server = "127.0.0.1:1883"
# proto_ver = "v4"
# username = "username1"
@@ -8,7 +9,6 @@
# retry_interval = "30s"
# max_inflight = 32
# reconnect_interval = "30s"
-# bridge_mode = true
# replayq {
# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
# seg_bytes = "100MB"
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index fe28dde9d..cfe52d279 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -105,10 +105,10 @@ method_example(_Type, put) ->
info_example_basic(mqtt) ->
#{
+ mode => cluster_shareload,
server => <<"127.0.0.1:1883">>,
reconnect_interval => <<"30s">>,
proto_ver => <<"v4">>,
- bridge_mode => true,
username => <<"foo">>,
password => <<"bar">>,
clientid => <<"foo">>,
diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl
index 2cce0d195..6bc609fa8 100644
--- a/apps/emqx_connector/src/emqx_connector_mqtt.erl
+++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl
@@ -182,7 +182,6 @@ basic_config(#{
server := Server,
reconnect_interval := ReconnIntv,
proto_ver := ProtoVer,
- bridge_mode := BridgeMod,
username := User,
password := Password,
clean_start := CleanStart,
@@ -197,7 +196,7 @@ basic_config(#{
server => Server,
reconnect_interval => ReconnIntv,
proto_ver => ProtoVer,
- bridge_mode => BridgeMod,
+ bridge_mode => true,
username => User,
password => Password,
clean_start => CleanStart,
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
index 415c6fa1a..2338129d1 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
@@ -8,7 +8,7 @@
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
+%% cluster_shareload under the License is cluster_shareload on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
@@ -38,7 +38,24 @@ fields("config") ->
topic_mappings();
fields("connector") ->
- [ {server,
+ [ {mode,
+ sc(hoconsc:enum([cluster_singleton, cluster_shareload]),
+ #{ default => cluster_shareload
+ , desc => """
+The mode of the MQTT Bridge. Can be one of 'cluster_singleton' or 'cluster_shareload'
+
+- cluster_singleton: create an unique MQTT connection within the emqx cluster.
+In 'cluster_singleton' node, all messages toward the remote broker go through the same
+MQTT connection.
+- cluster_shareload: create an MQTT connection on each node in the emqx cluster.
+In 'cluster_shareload' mode, the incomming load from the remote broker is shared by
+using shared subscription.
+Note that the 'clientid' is suffixed by the node name, this is to avoid
+clientid conflicts between different nodes. And we can only use shared subscription
+topic filters for 'from_remote_topic'.
+"""
+ })}
+ , {server,
sc(emqx_schema:ip_port(),
#{ default => "127.0.0.1:1883"
, desc => "The host and port of the remote MQTT broker"
@@ -49,11 +66,6 @@ fields("connector") ->
#{ default => v4
, desc => "The MQTT protocol version"
})}
- , {bridge_mode,
- sc(boolean(),
- #{ default => true
- , desc => "The bridge mode of the MQTT protocol"
- })}
, {username,
sc(binary(),
#{ default => "emqx"
From cc96880f188169334d1295c9609d634d43a3c1bf Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Tue, 7 Dec 2021 09:23:22 +0800
Subject: [PATCH 09/10] fix(dialyzer): some dialyzer issue
---
apps/emqx_bridge/src/emqx_bridge_api.erl | 8 ++------
apps/emqx_connector/src/emqx_connector_api.erl | 8 ++------
2 files changed, 4 insertions(+), 12 deletions(-)
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index 95b9c8759..5b2b62d82 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -428,11 +428,7 @@ rpc_call(Node, Mod, Fun, Args) ->
error_msg(Code, Msg) when is_binary(Msg) ->
#{code => Code, message => Msg};
error_msg(Code, Msg) ->
- #{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.
+ #{code => Code, message => bin(io_lib:format("~p", [Msg]))}.
-bin(S) when is_atom(S) ->
- atom_to_binary(S, utf8);
bin(S) when is_list(S) ->
- list_to_binary(S);
-bin(S) when is_binary(S) ->
- S.
+ list_to_binary(S).
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index cfe52d279..95bc33a83 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -261,11 +261,7 @@ schema("/connectors/:id") ->
error_msg(Code, Msg) when is_binary(Msg) ->
#{code => Code, message => Msg};
error_msg(Code, Msg) ->
- #{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.
+ #{code => Code, message => bin(io_lib:format("~p", [Msg]))}.
-bin(S) when is_atom(S) ->
- atom_to_binary(S, utf8);
bin(S) when is_list(S) ->
- list_to_binary(S);
-bin(S) when is_binary(S) ->
- S.
+ list_to_binary(S).
From 098d8eacb7311910228c6901bce1546086144bf4 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Tue, 7 Dec 2021 09:26:09 +0800
Subject: [PATCH 10/10] fix(metrics): update testcases for plugin_libs_metrics
---
apps/emqx_bridge/src/emqx_bridge_schema.erl | 2 +-
.../test/emqx_plugin_libs_metrics_SUITE.erl | 12 ++++++------
2 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl
index 2038e6c04..ec875d0a4 100644
--- a/apps/emqx_bridge/src/emqx_bridge_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl
@@ -26,7 +26,7 @@ get_response() ->
connector_name() ->
{connector,
- sc(binary(),
+ mk(binary(),
#{ nullable => false
, desc =>"""
The connector name to be used for this bridge.
diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl
index 3a74cd232..3f8a63f25 100644
--- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl
+++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl
@@ -24,7 +24,7 @@
all() ->
[ {group, metrics}
- , {group, speed} ].
+ , {group, rate} ].
suite() ->
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
@@ -34,8 +34,8 @@ groups() ->
[ t_rule
, t_no_creation_1
]},
- {speed, [sequence],
- [ rule_speed
+ {rate, [sequence],
+ [ rule_rate
]}
].
@@ -74,7 +74,7 @@ t_rule(_) ->
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>),
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>).
-rule_speed(_) ->
+rule_rate(_) ->
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>),
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
@@ -83,11 +83,11 @@ rule_speed(_) ->
?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')),
ct:sleep(1000),
?LET(#{max := Max, current := Current},
- emqx_plugin_libs_metrics:get_speed(?NAME, <<"rule1">>),
+ emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
{?assert(Max =< 2),
?assert(Current =< 2)}),
ct:sleep(2100),
- ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_speed(?NAME, <<"rule1">>),
+ ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
{?assert(Max =< 2),
?assert(Current == 0),
?assert(Last5Min =< 0.67)}),