Merge remote-tracking branch 'emqx/release-5.0-beta.3' into merge-5.0-beta.3-to-master

This commit is contained in:
JimMoen 2022-01-06 14:08:45 +08:00
commit 9a115b99a4
27 changed files with 464 additions and 267 deletions

View File

@ -61,7 +61,7 @@ jobs:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
with: with:
repository: emqx/emqx-fvt repository: emqx/emqx-fvt
ref: 1.0.3-dev1 ref: 1.0.3-dev2
path: . path: .
- uses: actions/setup-java@v1 - uses: actions/setup-java@v1
with: with:

View File

@ -7,7 +7,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-2:23.3.4.9-3-alpine3
export EMQX_DEFAULT_RUNNER = alpine:3.14 export EMQX_DEFAULT_RUNNER = alpine:3.14
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
export EMQX_DASHBOARD_VERSION ?= v0.14.0 export EMQX_DASHBOARD_VERSION ?= v0.17.0
export DOCKERFILE := deploy/docker/Dockerfile export DOCKERFILE := deploy/docker/Dockerfile
export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing
ifeq ($(OS),Windows_NT) ifeq ($(OS),Windows_NT)

View File

@ -24,8 +24,4 @@
%% NOTE: This version number should be manually bumped for each release %% NOTE: This version number should be manually bumped for each release
%% NOTE: This version number should have 3 numeric parts -define(EMQX_RELEASE, "5.0-beta.3").
%% (Major.Minor.Patch), and extra info can be added after a final
%% hyphen.
-define(EMQX_RELEASE, "5.0.0-beta.2").

View File

@ -0,0 +1,35 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_correction).
%% API
-export([ add/2 ]).
-type correction_value() :: #{ correction := emqx_limiter_decimal:zero_or_float()
, any() => any()
}.
-export_type([correction_value/0]).
%%--------------------------------------------------------------------
%%% API
%%--------------------------------------------------------------------
-spec add(number(), correction_value()) -> {integer(), correction_value()}.
add(Inc, #{correction := Correction} = Data) ->
FixedInc = Inc + Correction,
IntInc = erlang:floor(FixedInc),
{IntInc, Data#{correction := FixedInc - IntInc}}.

View File

@ -20,7 +20,7 @@
%% API %% API
-export([ add/2, sub/2, mul/2 -export([ add/2, sub/2, mul/2
, add_to_counter/3, put_to_counter/3, floor_div/2]). , put_to_counter/3, floor_div/2]).
-export_type([decimal/0, zero_or_float/0]). -export_type([decimal/0, zero_or_float/0]).
-type decimal() :: infinity | number(). -type decimal() :: infinity | number().
@ -60,22 +60,6 @@ floor_div(infinity, _) ->
floor_div(A, B) -> floor_div(A, B) ->
erlang:floor(A / B). erlang:floor(A / B).
-spec add_to_counter(counters:counters_ref(), pos_integer(), decimal()) ->
{zero_or_float(), zero_or_float()}.
add_to_counter(_, _, infinity) ->
{0, 0};
add_to_counter(Counter, Index, Val) when is_float(Val) ->
IntPart = erlang:floor(Val),
if IntPart > 0 ->
counters:add(Counter, Index, IntPart);
true ->
ok
end,
{IntPart, Val - IntPart};
add_to_counter(Counter, Index, Val) ->
counters:add(Counter, Index, Val),
{Val, 0}.
-spec put_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> ok. -spec put_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> ok.
put_to_counter(_, _, infinity) -> put_to_counter(_, _, infinity) ->
ok; ok;

View File

@ -87,7 +87,7 @@
-define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter -define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter
-export_type([index/0]). -export_type([index/0]).
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, add_to_counter/3, put_to_counter/3]). -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
@ -317,12 +317,11 @@ longitudinal(#{id := Id,
longitudinal(#{id := Id, longitudinal(#{id := Id,
rate := Rate, rate := Rate,
capacity := Capacity, capacity := Capacity,
correction := Correction,
counter := Counter, counter := Counter,
index := Index, index := Index,
obtained := Obtained} = Node, obtained := Obtained} = Node,
InFlow, Nodes) when Counter =/= undefined -> InFlow, Nodes) when Counter =/= undefined ->
Flow = add(erlang:min(InFlow, Rate), Correction), Flow = erlang:min(InFlow, Rate),
ShouldAlloc = ShouldAlloc =
case counters:get(Counter, Index) of case counters:get(Counter, Index) of
@ -340,11 +339,11 @@ longitudinal(#{id := Id,
Avaiable when Avaiable > 0 -> Avaiable when Avaiable > 0 ->
%% XXX if capacity is infinity, and flow always > 0, the value in counter %% XXX if capacity is infinity, and flow always > 0, the value in counter
%% will be overflow at some point in the future, do we need to deal with this situation??? %% will be overflow at some point in the future, do we need to deal with this situation???
{Alloced, Decimal} = add_to_counter(Counter, Index, Avaiable), {Inc, Node2} = emqx_limiter_correction:add(Avaiable, Node),
counters:add(Counter, Index, Inc),
{Alloced, {Inc,
Nodes#{Id := Node#{obtained := Obtained + Alloced, Nodes#{Id := Node2#{obtained := Obtained + Inc}}};
correction := Decimal}}};
_ -> _ ->
{0, Nodes} {0, Nodes}
end; end;
@ -411,31 +410,38 @@ dispatch_burst([], State) ->
dispatch_burst(GroupL, dispatch_burst(GroupL,
#{root := #{burst := Burst}, #{root := #{burst := Burst},
nodes := Nodes} = State) -> nodes := Nodes} = State) ->
InFlow = erlang:floor(Burst / erlang:length(GroupL)), InFlow = Burst / erlang:length(GroupL),
Dispatch = fun({Zone, Childs}, NodeAcc) -> Dispatch = fun({Zone, Childs}, NodeAcc) ->
#{id := ZoneId, #{id := ZoneId,
burst := ZoneBurst, burst := ZoneBurst,
obtained := Obtained} = Zone, obtained := Obtained} = Zone,
ZoneFlow = erlang:min(InFlow, ZoneBurst), case erlang:min(InFlow, ZoneBurst) of
EachFlow = ZoneFlow div erlang:length(Childs), 0 -> NodeAcc;
Zone2 = Zone#{obtained := Obtained + ZoneFlow}, ZoneFlow ->
NodeAcc2 = NodeAcc#{ZoneId := Zone2}, EachFlow = ZoneFlow / erlang:length(Childs),
dispatch_burst_to_buckets(Childs, EachFlow, NodeAcc2) {Alloced, NodeAcc2} = dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc),
Zone2 = Zone#{obtained := Obtained + Alloced},
NodeAcc2#{ZoneId := Zone2}
end
end, end,
State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}. State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}.
-spec dispatch_burst_to_buckets(list(node_id()), -spec dispatch_burst_to_buckets(list(node_id()),
non_neg_integer(), nodes()) -> nodes(). float(), non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}.
dispatch_burst_to_buckets(Childs, InFlow, Nodes) -> dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) ->
Each = fun(ChildId, NodeAcc) -> #{counter := Counter,
#{counter := Counter, index := Index,
index := Index, obtained := Obtained} = Bucket = maps:get(ChildId, Nodes),
obtained := Obtained} = Bucket = maps:get(ChildId, NodeAcc), {Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket),
counters:add(Counter, Index, InFlow),
NodeAcc#{ChildId := Bucket#{obtained := Obtained + InFlow}} counters:add(Counter, Index, Inc),
end,
lists:foldl(Each, Nodes, Childs). Nodes2 = Nodes#{ChildId := Bucket2#{obtained := Obtained + Inc}},
dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Nodes2);
dispatch_burst_to_buckets([], _, Alloced, Nodes) ->
{Alloced, Nodes}.
-spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state().
init_tree(Type, State) -> init_tree(Type, State) ->

View File

@ -146,7 +146,6 @@
{counter, 'messages.dropped.expired'}, % QoS2 Messages expired {counter, 'messages.dropped.expired'}, % QoS2 Messages expired
{counter, 'messages.dropped.no_subscribers'}, % Messages dropped {counter, 'messages.dropped.no_subscribers'}, % Messages dropped
{counter, 'messages.forward'}, % Messages forward {counter, 'messages.forward'}, % Messages forward
{counter, 'messages.retained'}, % Messages retained
{counter, 'messages.delayed'}, % Messages delayed {counter, 'messages.delayed'}, % Messages delayed
{counter, 'messages.delivered'}, % Messages delivered {counter, 'messages.delivered'}, % Messages delivered
{counter, 'messages.acked'} % Messages acked {counter, 'messages.acked'} % Messages acked
@ -207,7 +206,7 @@ stop() -> gen_server:stop(?SERVER).
%% BACKW: v4.3.0 %% BACKW: v4.3.0
upgrade_retained_delayed_counter_type() -> upgrade_retained_delayed_counter_type() ->
Ks = ['messages.retained', 'messages.delayed'], Ks = ['messages.delayed'],
gen_server:call(?SERVER, {set_type_to_counter, Ks}, infinity). gen_server:call(?SERVER, {set_type_to_counter, Ks}, infinity).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -556,7 +555,7 @@ reserved_idx('messages.dropped') -> 109;
reserved_idx('messages.dropped.expired') -> 110; reserved_idx('messages.dropped.expired') -> 110;
reserved_idx('messages.dropped.no_subscribers') -> 111; reserved_idx('messages.dropped.no_subscribers') -> 111;
reserved_idx('messages.forward') -> 112; reserved_idx('messages.forward') -> 112;
reserved_idx('messages.retained') -> 113; %%reserved_idx('messages.retained') -> 113; %% keep the index, new metrics can use this
reserved_idx('messages.delayed') -> 114; reserved_idx('messages.delayed') -> 114;
reserved_idx('messages.delivered') -> 115; reserved_idx('messages.delivered') -> 115;
reserved_idx('messages.acked') -> 116; reserved_idx('messages.acked') -> 116;
@ -592,4 +591,3 @@ reserved_idx('olp.gc') -> 303;
reserved_idx('olp.new_conn') -> 304; reserved_idx('olp.new_conn') -> 304;
reserved_idx(_) -> undefined. reserved_idx(_) -> undefined.

View File

@ -71,19 +71,10 @@ t_inc_dec(_) ->
with_metrics_server( with_metrics_server(
fun() -> fun() ->
?assertEqual(0, emqx_metrics:val('bytes.received')), ?assertEqual(0, emqx_metrics:val('bytes.received')),
?assertEqual(0, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:inc('bytes.received'), ok = emqx_metrics:inc('bytes.received'),
ok = emqx_metrics:inc('bytes.received', 2), ok = emqx_metrics:inc('bytes.received', 2),
ok = emqx_metrics:inc('bytes.received', 2), ok = emqx_metrics:inc('bytes.received', 2),
?assertEqual(5, emqx_metrics:val('bytes.received')), ?assertEqual(5, emqx_metrics:val('bytes.received'))
ok = emqx_metrics:inc('messages.retained', 2),
ok = emqx_metrics:inc('messages.retained', 2),
?assertEqual(4, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:dec('messages.retained'),
ok = emqx_metrics:dec('messages.retained', 1),
?assertEqual(2, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:set('messages.retained', 3),
?assertEqual(3, emqx_metrics:val('messages.retained'))
end). end).
t_inc_recv(_) -> t_inc_recv(_) ->
@ -162,21 +153,12 @@ t_trans(_) ->
ok = emqx_metrics:trans(inc, 'bytes.received'), ok = emqx_metrics:trans(inc, 'bytes.received'),
ok = emqx_metrics:trans(inc, 'bytes.received', 2), ok = emqx_metrics:trans(inc, 'bytes.received', 2),
?assertEqual(0, emqx_metrics:val('bytes.received')), ?assertEqual(0, emqx_metrics:val('bytes.received')),
ok = emqx_metrics:trans(inc, 'messages.retained', 2),
ok = emqx_metrics:trans(inc, 'messages.retained', 2),
?assertEqual(0, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:commit(), ok = emqx_metrics:commit(),
?assertEqual(3, emqx_metrics:val('bytes.received')), ?assertEqual(3, emqx_metrics:val('bytes.received')),
?assertEqual(4, emqx_metrics:val('messages.retained')), ok = emqx_metrics:commit()
ok = emqx_metrics:trans(dec, 'messages.retained'),
ok = emqx_metrics:trans(dec, 'messages.retained', 1),
?assertEqual(4, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:commit(),
?assertEqual(2, emqx_metrics:val('messages.retained'))
end). end).
with_metrics_server(Fun) -> with_metrics_server(Fun) ->
{ok, _} = emqx_metrics:start_link(), {ok, _} = emqx_metrics:start_link(),
_ = Fun(), _ = Fun(),
ok = emqx_metrics:stop(). ok = emqx_metrics:stop().

View File

@ -288,7 +288,7 @@ systopic_metrics() ->
<<"messages/qos2/received">>, <<"messages/qos2/sent">>, <<"messages/qos2/received">>, <<"messages/qos2/sent">>,
<<"messages/publish">>, <<"messages/dropped">>, <<"messages/publish">>, <<"messages/dropped">>,
<<"messages/dropped/expired">>, <<"messages/dropped/no_subscribers">>, <<"messages/dropped/expired">>, <<"messages/dropped/no_subscribers">>,
<<"messages/forward">>, <<"messages/retained">>, <<"messages/forward">>,
<<"messages/delayed">>, <<"messages/delivered">>, <<"messages/delayed">>, <<"messages/delivered">>,
<<"messages/acked">>], <<"messages/acked">>],
?LET({Nodename, T}, ?LET({Nodename, T},

View File

@ -19,6 +19,7 @@
-include("emqx_authn.hrl"). -include("emqx_authn.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
-behaviour(hocon_schema). -behaviour(hocon_schema).
-behaviour(emqx_authentication). -behaviour(emqx_authentication).
@ -77,7 +78,7 @@ validations() ->
]. ].
url(type) -> binary(); url(type) -> binary();
url(validator) -> [fun check_url/1]; url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")];
url(nullable) -> false; url(nullable) -> false;
url(_) -> undefined. url(_) -> undefined.
@ -118,16 +119,16 @@ create(_AuthenticatorID, Config) ->
create(Config). create(Config).
create(#{method := Method, create(#{method := Method,
url := URL, url := RawURL,
headers := Headers, headers := Headers,
body := Body, body := Body,
request_timeout := RequestTimeout} = Config) -> request_timeout := RequestTimeout} = Config) ->
#{path := Path, {BsaeUrlWithPath, Query} = parse_fullpath(RawURL),
query := Query} = URIMap = parse_url(URL), URIMap = parse_url(BsaeUrlWithPath),
ResourceId = emqx_authn_utils:make_resource_id(?MODULE), ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
State = #{method => Method, State = #{method => Method,
path => Path, path => maps:get(path, URIMap),
base_query => cow_qs:parse_qs(list_to_binary(Query)), base_query => cow_qs:parse_qs(to_bin(Query)),
headers => maps:to_list(Headers), headers => maps:to_list(Headers),
body => maps:to_list(Body), body => maps:to_list(Body),
request_timeout => RequestTimeout, request_timeout => RequestTimeout,
@ -204,11 +205,8 @@ destroy(#{resource_id := ResourceId}) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
check_url(URL) -> parse_fullpath(RawURL) ->
case emqx_http_lib:uri_parse(URL) of cow_http:parse_fullpath(to_bin(RawURL)).
{ok, _} -> true;
{error, _} -> false
end.
check_body(Body) -> check_body(Body) ->
lists:all( lists:all(
@ -234,7 +232,8 @@ transform_header_name(Headers) ->
end, #{}, Headers). end, #{}, Headers).
check_ssl_opts(Conf) -> check_ssl_opts(Conf) ->
case parse_url(get_conf_val("url", Conf)) of {BaseUrlWithPath, _Query} = parse_fullpath(get_conf_val("url", Conf)),
case parse_url(BaseUrlWithPath) of
#{scheme := https} -> #{scheme := https} ->
case get_conf_val("ssl.enable", Conf) of case get_conf_val("ssl.enable", Conf) of
true -> ok; true -> ok;
@ -264,12 +263,13 @@ generate_request(Credential, #{method := Method,
headers := Headers, headers := Headers,
body := Body0}) -> body := Body0}) ->
Body = replace_placeholders(Body0, Credential), Body = replace_placeholders(Body0, Credential),
NBaseQuery = replace_placeholders(BaseQuery, Credential),
case Method of case Method of
get -> get ->
NPath = append_query(Path, BaseQuery ++ Body), NPath = append_query(Path, NBaseQuery ++ Body),
{NPath, Headers}; {NPath, Headers};
post -> post ->
NPath = append_query(Path, BaseQuery), NPath = append_query(Path, NBaseQuery),
ContentType = proplists:get_value(<<"content-type">>, Headers), ContentType = proplists:get_value(<<"content-type">>, Headers),
NBody = serialize_body(ContentType, Body), NBody = serialize_body(ContentType, Body),
{NPath, Headers, NBody} {NPath, Headers, NBody}

View File

@ -412,9 +412,22 @@ get_raw_sources() ->
RawSources = emqx:get_raw_config([authorization, sources], []), RawSources = emqx:get_raw_config([authorization, sources], []),
Schema = #{roots => emqx_authz_schema:fields("authorization"), fields => #{}}, Schema = #{roots => emqx_authz_schema:fields("authorization"), fields => #{}},
Conf = #{<<"sources">> => RawSources}, Conf = #{<<"sources">> => RawSources},
#{<<"sources">> := Sources} = hocon_schema:check_plain(Schema, Conf, #{<<"sources">> := Sources} = hocon_schema:check_plain(Schema, Conf, #{only_fill_defaults => true}),
#{only_fill_defaults => true}), merge_default_headers(Sources).
Sources.
merge_default_headers(Sources) ->
lists:map(fun(Source) ->
Convert =
case Source of
#{<<"method">> := <<"get">>} ->
emqx_authz_schema:headers_no_content_type(converter);
#{<<"method">> := <<"post">>} ->
emqx_authz_schema:headers(converter);
_ -> fun(H) -> H end
end,
Headers = Convert(maps:get(<<"headers">>, Source, #{})),
Source#{<<"headers">> => Headers}
end, Sources).
get_raw_source(Type) -> get_raw_source(Type) ->
lists:filter(fun (#{<<"type">> := T}) -> lists:filter(fun (#{<<"type">> := T}) ->

View File

@ -29,6 +29,7 @@
, destroy/1 , destroy/1
, dry_run/1 , dry_run/1
, authorize/4 , authorize/4
, parse_url/1
]). ]).
-ifdef(TEST). -ifdef(TEST).
@ -39,62 +40,29 @@
description() -> description() ->
"AuthZ with http". "AuthZ with http".
init(Source) -> init(Config) ->
case emqx_authz_utils:create_resource(emqx_connector_http, Source) of NConfig = parse_config(Config),
case emqx_authz_utils:create_resource(emqx_connector_http, NConfig) of
{error, Reason} -> error({load_config_error, Reason}); {error, Reason} -> error({load_config_error, Reason});
{ok, Id} -> Source#{annotations => #{id => Id}} {ok, Id} -> NConfig#{annotations => #{id => Id}}
end. end.
destroy(#{annotations := #{id := Id}}) -> destroy(#{annotations := #{id := Id}}) ->
ok = emqx_resource:remove_local(Id). ok = emqx_resource:remove_local(Id).
dry_run(Source) -> dry_run(Config) ->
emqx_resource:create_dry_run_local(emqx_connector_http, Source). emqx_resource:create_dry_run_local(emqx_connector_http, parse_config(Config)).
authorize(Client, PubSub, Topic, authorize( Client
#{type := http, , PubSub
query := Query, , Topic
path := Path, , #{ type := http
headers := Headers, , annotations := #{id := ResourceID}
method := Method, , method := Method
request_timeout := RequestTimeout, , request_timeout := RequestTimeout
annotations := #{id := ResourceID} } = Config) ->
} = Source) -> Request = generate_request(PubSub, Topic, Client, Config),
Request = case Method of case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of
get ->
Path1 = replvar(
Path ++ "?" ++ Query,
PubSub,
Topic,
maps:to_list(Client),
fun var_uri_encode/1),
{Path1, maps:to_list(Headers)};
_ ->
Body0 = maps:get(body, Source, #{}),
Body1 = replvar_deep(
Body0,
PubSub,
Topic,
maps:to_list(Client),
fun var_bin_encode/1),
Body2 = serialize_body(
maps:get(<<"content-type">>, Headers, <<"application/json">>),
Body1),
Path1 = replvar(
Path,
PubSub,
Topic,
maps:to_list(Client),
fun var_uri_encode/1),
{Path1, maps:to_list(Headers), Body2}
end,
HttpResult = emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}),
case HttpResult of
{ok, 200, _Headers} -> {ok, 200, _Headers} ->
{matched, allow}; {matched, allow};
{ok, 204, _Headers} -> {ok, 204, _Headers} ->
@ -112,12 +80,75 @@ authorize(Client, PubSub, Topic,
ignore ignore
end. end.
parse_config(#{ url := URL
, method := Method
, headers := Headers
, request_timeout := ReqTimeout
} = Conf) ->
{BaseURLWithPath, Query} = parse_fullpath(URL),
BaseURLMap = parse_url(BaseURLWithPath),
Conf#{ method => Method
, base_url => maps:remove(query, BaseURLMap)
, base_query => cow_qs:parse_qs(bin(Query))
, body => maps:get(body, Conf, #{})
, headers => Headers
, request_timeout => ReqTimeout
}.
parse_fullpath(RawURL) ->
cow_http:parse_fullpath(bin(RawURL)).
parse_url(URL)
when URL =:= undefined ->
#{};
parse_url(URL) ->
{ok, URIMap} = emqx_http_lib:uri_parse(URL),
case maps:get(query, URIMap, undefined) of
undefined ->
URIMap#{query => ""};
_ ->
URIMap
end.
generate_request( PubSub
, Topic
, Client
, #{ method := Method
, base_url := #{path := Path}
, base_query := BaseQuery
, headers := Headers
, body := Body0
}) ->
Body = replace_placeholders(maps:to_list(Body0), PubSub, Topic, Client),
NBaseQuery = replace_placeholders(BaseQuery, PubSub, Topic, Client),
case Method of
get ->
NPath = append_query(Path, NBaseQuery ++ Body),
{NPath, Headers};
_ ->
NPath = append_query(Path, NBaseQuery),
NBody = serialize_body(
proplists:get_value(<<"Accept">>, Headers, <<"application/json">>),
Body
),
{NPath, Headers, NBody}
end.
append_query(Path, []) ->
Path;
append_query(Path, Query) ->
Path ++ "?" ++ binary_to_list(query_string(Query)).
query_string(Body) -> query_string(Body) ->
query_string(maps:to_list(Body), []). query_string(Body, []).
query_string([], Acc) -> query_string([], Acc) ->
<<$&, Str/binary>> = iolist_to_binary(lists:reverse(Acc)), case iolist_to_binary(lists:reverse(Acc)) of
Str; <<$&, Str/binary>> ->
Str;
<<>> ->
<<>>
end;
query_string([{K, V} | More], Acc) -> query_string([{K, V} | More], Acc) ->
query_string( More query_string( More
, [ ["&", emqx_http_lib:uri_encode(K), "=", emqx_http_lib:uri_encode(V)] , [ ["&", emqx_http_lib:uri_encode(K), "=", emqx_http_lib:uri_encode(V)]
@ -128,67 +159,36 @@ serialize_body(<<"application/json">>, Body) ->
serialize_body(<<"application/x-www-form-urlencoded">>, Body) -> serialize_body(<<"application/x-www-form-urlencoded">>, Body) ->
query_string(Body). query_string(Body).
replace_placeholders(KVs, PubSub, Topic, Client) ->
replace_placeholders(KVs, PubSub, Topic, Client, []).
replvar_deep(Map, PubSub, Topic, Vars, VarEncode) when is_map(Map) -> replace_placeholders([], _PubSub, _Topic, _Client, Acc) ->
maps:from_list( lists:reverse(Acc);
lists:map( replace_placeholders([{K, V0} | More], PubSub, Topic, Client, Acc) ->
fun({Key, Value}) -> case replace_placeholder(V0, PubSub, Topic, Client) of
{replvar(Key, PubSub, Topic, Vars, VarEncode), undefined ->
replvar_deep(Value, PubSub, Topic, Vars, VarEncode)} error({cannot_get_variable, V0});
end, V ->
maps:to_list(Map))); replace_placeholders(More, PubSub, Topic, Client, [{bin(K), bin(V)} | Acc])
replvar_deep(List, PubSub, Topic, Vars, VarEncode) when is_list(List) -> end.
lists:map(
fun(Value) ->
replvar_deep(Value, PubSub, Topic, Vars, VarEncode)
end,
List);
replvar_deep(Number, _PubSub, _Topic, _Vars, _VarEncode) when is_number(Number) ->
Number;
replvar_deep(Binary, PubSub, Topic, Vars, VarEncode) when is_binary(Binary) ->
replvar(Binary, PubSub, Topic, Vars, VarEncode).
replvar(Str0, PubSub, Topic, [], VarEncode) -> replace_placeholder(?PH_USERNAME, _PubSub, _Topic, Client) ->
NTopic = emqx_http_lib:uri_encode(Topic), bin(maps:get(username, Client, undefined));
Str1 = re:replace(Str0, emqx_authz:ph_to_re(?PH_S_TOPIC), replace_placeholder(?PH_CLIENTID, _PubSub, _Topic, Client) ->
VarEncode(NTopic), [global, {return, binary}]), bin(maps:get(clientid, Client, undefined));
re:replace(Str1, emqx_authz:ph_to_re(?PH_S_ACTION), replace_placeholder(?PH_HOST, _PubSub, _Topic, Client) ->
VarEncode(PubSub), [global, {return, binary}]); inet_parse:ntoa(maps:get(peerhost, Client, undefined));
replace_placeholder(?PH_PROTONAME, _PubSub, _Topic, Client) ->
bin(maps:get(protocol, Client, undefined));
replace_placeholder(?PH_MOUNTPOINT, _PubSub, _Topic, Client) ->
bin(maps:get(mountpoint, Client, undefined));
replace_placeholder(?PH_TOPIC, _PubSub, Topic, _Client) ->
bin(emqx_http_lib:uri_encode(Topic));
replace_placeholder(?PH_ACTION, PubSub, _Topic, _Client) ->
bin(PubSub);
replace_placeholder(Constant, _, _, _) ->
replvar(Str, PubSub, Topic, [{username, Username} | Rest], VarEncode) -> Constant.
Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_USERNAME),
VarEncode(Username), [global, {return, binary}]),
replvar(Str1, PubSub, Topic, Rest, VarEncode);
replvar(Str, PubSub, Topic, [{clientid, Clientid} | Rest], VarEncode) ->
Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_CLIENTID),
VarEncode(Clientid), [global, {return, binary}]),
replvar(Str1, PubSub, Topic, Rest, VarEncode);
replvar(Str, PubSub, Topic, [{peerhost, IpAddress} | Rest], VarEncode) ->
Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_PEERHOST),
VarEncode(inet_parse:ntoa(IpAddress)), [global, {return, binary}]),
replvar(Str1, PubSub, Topic, Rest, VarEncode);
replvar(Str, PubSub, Topic, [{protocol, Protocol} | Rest], VarEncode) ->
Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_PROTONAME),
VarEncode(Protocol), [global, {return, binary}]),
replvar(Str1, PubSub, Topic, Rest, VarEncode);
replvar(Str, PubSub, Topic, [{mountpoint, Mountpoint} | Rest], VarEncode) ->
Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_MOUNTPOINT),
VarEncode(Mountpoint), [global, {return, binary}]),
replvar(Str1, PubSub, Topic, Rest, VarEncode);
replvar(Str, PubSub, Topic, [_Unknown | Rest], VarEncode) ->
replvar(Str, PubSub, Topic, Rest, VarEncode).
var_uri_encode(S) ->
emqx_http_lib:uri_encode(bin(S)).
var_bin_encode(S) ->
bin(S).
bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(B) when is_binary(B) -> B; bin(B) when is_binary(B) -> B;

View File

@ -17,6 +17,7 @@
-module(emqx_authz_schema). -module(emqx_authz_schema).
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
-reflect_type([ permission/0 -reflect_type([ permission/0
, action/0 , action/0
@ -28,9 +29,15 @@
-export([ namespace/0 -export([ namespace/0
, roots/0 , roots/0
, fields/1 , fields/1
, validations/0
]).
-export([ headers_no_content_type/1
, headers/1
]). ]).
-import(emqx_schema, [mk_duration/2]). -import(emqx_schema, [mk_duration/2]).
-include_lib("hocon/include/hoconsc.hrl").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Hocon Schema %% Hocon Schema
@ -138,11 +145,10 @@ fields(redis_cluster) ->
http_common_fields() -> http_common_fields() ->
[ {type, #{type => http}} [ {type, #{type => http}}
, {enable, #{type => boolean(), default => true}} , {enable, #{type => boolean(), default => true}}
, {url, fun url/1}
, {request_timeout, mk_duration("request timeout", #{default => "30s"})} , {request_timeout, mk_duration("request timeout", #{default => "30s"})}
, {body, #{type => map(), nullable => true}} , {body, #{type => map(), nullable => true}}
, {path, #{type => string(), default => ""}} ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)).
, {query, #{type => string(), default => ""}}
] ++ emqx_connector_http:fields(config).
mongo_common_fields() -> mongo_common_fields() ->
[ {collection, #{type => atom()}} [ {collection, #{type => atom()}}
@ -152,22 +158,32 @@ mongo_common_fields() ->
default => true}} default => true}}
]. ].
headers(type) -> map(); validations() ->
[ {check_ssl_opts, fun check_ssl_opts/1}
, {check_headers, fun check_headers/1}
].
headers(type) -> list({binary(), binary()});
headers(converter) -> headers(converter) ->
fun(Headers) -> fun(Headers) ->
maps:merge(default_headers(), transform_header_name(Headers)) maps:to_list(maps:merge(default_headers(), transform_header_name(Headers)))
end; end;
headers(default) -> default_headers(); headers(default) -> default_headers();
headers(_) -> undefined. headers(_) -> undefined.
headers_no_content_type(type) -> map(); headers_no_content_type(type) -> list({binary(), binary()});
headers_no_content_type(converter) -> headers_no_content_type(converter) ->
fun(Headers) -> fun(Headers) ->
maps:merge(default_headers_no_content_type(), transform_header_name(Headers)) maps:to_list(maps:merge(default_headers_no_content_type(), transform_header_name(Headers)))
end; end;
headers_no_content_type(default) -> default_headers_no_content_type(); headers_no_content_type(default) -> default_headers_no_content_type();
headers_no_content_type(_) -> undefined. headers_no_content_type(_) -> undefined.
url(type) -> binary();
url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")];
url(nullable) -> false;
url(_) -> undefined.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -190,6 +206,28 @@ transform_header_name(Headers) ->
maps:put(K, V, Acc) maps:put(K, V, Acc)
end, #{}, Headers). end, #{}, Headers).
check_ssl_opts(Conf)
when Conf =:= #{} ->
true;
check_ssl_opts(Conf) ->
case emqx_authz_http:parse_url(hocon_schema:get_value("config.url", Conf)) of
#{scheme := https} ->
case hocon_schema:get_value("config.ssl.enable", Conf) of
true -> ok;
false -> false
end;
#{scheme := http} ->
ok
end.
check_headers(Conf)
when Conf =:= #{} ->
true;
check_headers(Conf) ->
Method = to_bin(hocon_schema:get_value("config.method", Conf)),
Headers = hocon_schema:get_value("config.headers", Conf),
Method =:= <<"post">> orelse (not lists:member(<<"content-type">>, Headers)).
union_array(Item) when is_list(Item) -> union_array(Item) when is_list(Item) ->
hoconsc:array(hoconsc:union(Item)). hoconsc:array(hoconsc:union(Item)).
@ -225,3 +263,9 @@ to_list(A) when is_atom(A) ->
to_list(B) when is_binary(B) -> to_list(B) when is_binary(B) ->
binary_to_list(B). binary_to_list(B).
to_bin(A) when is_atom(A) ->
atom_to_binary(A);
to_bin(B) when is_binary(B) ->
B;
to_bin(L) when is_list(L) ->
list_to_binary(L).

View File

@ -30,9 +30,7 @@
-define(SOURCE1, #{<<"type">> => <<"http">>, -define(SOURCE1, #{<<"type">> => <<"http">>,
<<"enable">> => true, <<"enable">> => true,
<<"base_url">> => <<"https://fake.com:443/">>, <<"url">> => <<"https://fake.com:443/acl?username=", ?PH_USERNAME/binary>>,
<<"path">> => <<"foo">>,
<<"query">> => <<"a=b">>,
<<"headers">> => #{}, <<"headers">> => #{},
<<"method">> => <<"get">>, <<"method">> => <<"get">>,
<<"request_timeout">> => <<"5s">> <<"request_timeout">> => <<"5s">>

View File

@ -262,11 +262,20 @@ preprocess_request(#{
, request_timeout => maps:get(request_timeout, Req, 30000) , request_timeout => maps:get(request_timeout, Req, 30000)
}. }.
preproc_headers(Headers) -> preproc_headers(Headers) when is_map(Headers) ->
maps:fold(fun(K, V, Acc) -> maps:fold(fun(K, V, Acc) ->
Acc#{emqx_plugin_libs_rule:preproc_tmpl(bin(K)) => [{
emqx_plugin_libs_rule:preproc_tmpl(bin(V))} emqx_plugin_libs_rule:preproc_tmpl(bin(K)),
end, #{}, Headers). emqx_plugin_libs_rule:preproc_tmpl(bin(V))
} | Acc]
end, [], Headers);
preproc_headers(Headers) when is_list(Headers) ->
lists:map(fun({K, V}) ->
{
emqx_plugin_libs_rule:preproc_tmpl(bin(K)),
emqx_plugin_libs_rule:preproc_tmpl(bin(V))
}
end, Headers).
process_request(#{ process_request(#{
method := MethodTks, method := MethodTks,
@ -278,7 +287,7 @@ process_request(#{
Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)) Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg))
, path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg) , path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg)
, body => process_request_body(BodyTks, Msg) , body => process_request_body(BodyTks, Msg)
, headers => maps:to_list(proc_headers(HeadersTks, Msg)) , headers => proc_headers(HeadersTks, Msg)
, request_timeout => ReqTimeout , request_timeout => ReqTimeout
}. }.
@ -288,10 +297,12 @@ process_request_body(BodyTks, Msg) ->
emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg). emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg).
proc_headers(HeaderTks, Msg) -> proc_headers(HeaderTks, Msg) ->
maps:fold(fun(K, V, Acc) -> lists:map(fun({K, V}) ->
Acc#{emqx_plugin_libs_rule:proc_tmpl(K, Msg) => {
emqx_plugin_libs_rule:proc_tmpl(V, Msg)} emqx_plugin_libs_rule:proc_tmpl(K, Msg),
end, #{}, HeaderTks). emqx_plugin_libs_rule:proc_tmpl(V, Msg)
}
end, HeaderTks).
make_method(M) when M == <<"POST">>; M == <<"post">> -> post; make_method(M) when M == <<"POST">>; M == <<"post">> -> post;
make_method(M) when M == <<"PUT">>; M == <<"put">> -> put; make_method(M) when M == <<"PUT">>; M == <<"put">> -> put;

View File

@ -221,27 +221,26 @@ t_mqtt_conn_bridge_ingress(_) ->
%% ... and a MQTT bridge, using POST %% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now %% we bind this bridge to the connector created just now
timer:sleep(50),
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_INGRESS(ConnctorID)#{ ?MQTT_BRIDGE_INGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE, <<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_INGRESS <<"name">> => ?BRIDGE_NAME_INGRESS
}), }),
#{ <<"id">> := BridgeIDIngress #{ <<"id">> := BridgeIDIngress
, <<"type">> := <<"mqtt">> , <<"type">> := <<"mqtt">>
, <<"status">> := <<"connected">>
, <<"connector">> := ConnctorID , <<"connector">> := ConnctorID
} = jsx:decode(Bridge), } = jsx:decode(Bridge),
wait_for_resource_ready(BridgeIDIngress, 5),
%% we now test if the bridge works as expected %% we now test if the bridge works as expected
RemoteTopic = <<"remote_topic/1">>, RemoteTopic = <<"remote_topic/1">>,
LocalTopic = <<"local_topic/", RemoteTopic/binary>>, LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
Payload = <<"hello">>, Payload = <<"hello">>,
emqx:subscribe(LocalTopic), emqx:subscribe(LocalTopic),
timer:sleep(100),
%% PUBLISH a message to the 'remote' broker, as we have only one broker, %% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one. %% the remote broker is also the local one.
wait_for_resource_ready(BridgeIDIngress, 5),
emqx:publish(emqx_message:make(RemoteTopic, Payload)), emqx:publish(emqx_message:make(RemoteTopic, Payload)),
%% we should receive a message on the local broker, with specified topic %% we should receive a message on the local broker, with specified topic
?assert( ?assert(
@ -295,22 +294,21 @@ t_mqtt_conn_bridge_egress(_) ->
<<"type">> => ?CONNECTR_TYPE, <<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS <<"name">> => ?BRIDGE_NAME_EGRESS
}), }),
#{ <<"id">> := BridgeIDEgress #{ <<"id">> := BridgeIDEgress
, <<"type">> := ?CONNECTR_TYPE , <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?BRIDGE_NAME_EGRESS , <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"connected">>
, <<"connector">> := ConnctorID , <<"connector">> := ConnctorID
} = jsx:decode(Bridge), } = jsx:decode(Bridge),
wait_for_resource_ready(BridgeIDEgress, 5),
%% we now test if the bridge works as expected %% we now test if the bridge works as expected
LocalTopic = <<"local_topic/1">>, LocalTopic = <<"local_topic/1">>,
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
Payload = <<"hello">>, Payload = <<"hello">>,
emqx:subscribe(RemoteTopic), emqx:subscribe(RemoteTopic),
timer:sleep(100),
%% PUBLISH a message to the 'local' broker, as we have only one broker, %% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one. %% the remote broker is also the local one.
wait_for_resource_ready(BridgeIDEgress, 5),
emqx:publish(emqx_message:make(LocalTopic, Payload)), emqx:publish(emqx_message:make(LocalTopic, Payload)),
%% we should receive a message on the "remote" broker, with specified topic %% we should receive a message on the "remote" broker, with specified topic
@ -369,10 +367,9 @@ t_mqtt_conn_update(_) ->
#{ <<"id">> := BridgeIDEgress #{ <<"id">> := BridgeIDEgress
, <<"type">> := <<"mqtt">> , <<"type">> := <<"mqtt">>
, <<"name">> := ?BRIDGE_NAME_EGRESS , <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"connected">>
, <<"connector">> := ConnctorID , <<"connector">> := ConnctorID
} = jsx:decode(Bridge), } = jsx:decode(Bridge),
wait_for_resource_ready(BridgeIDEgress, 2), wait_for_resource_ready(BridgeIDEgress, 5),
%% then we try to update 'server' of the connector, to an unavailable IP address %% then we try to update 'server' of the connector, to an unavailable IP address
%% the update should fail because of 'unreachable' or 'connrefused' %% the update should fail because of 'unreachable' or 'connrefused'
@ -424,6 +421,7 @@ t_mqtt_conn_update2(_) ->
{ok, 200, _} = request(put, uri(["connectors", ConnctorID]), {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
wait_for_resource_ready(BridgeIDEgress, 5),
?assertMatch(#{ <<"id">> := BridgeIDEgress ?assertMatch(#{ <<"id">> := BridgeIDEgress
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
}, jsx:decode(BridgeStr)), }, jsx:decode(BridgeStr)),
@ -454,7 +452,7 @@ t_mqtt_conn_update3(_) ->
#{ <<"id">> := BridgeIDEgress #{ <<"id">> := BridgeIDEgress
, <<"connector">> := ConnctorID , <<"connector">> := ConnctorID
} = jsx:decode(Bridge), } = jsx:decode(Bridge),
wait_for_resource_ready(BridgeIDEgress, 2), wait_for_resource_ready(BridgeIDEgress, 5),
%% delete the connector should fail because it is in use by a bridge %% delete the connector should fail because it is in use by a bridge
{ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []),
@ -505,6 +503,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
LocalTopic = <<"local_topic/", RemoteTopic/binary>>, LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
Payload = <<"hello">>, Payload = <<"hello">>,
emqx:subscribe(LocalTopic), emqx:subscribe(LocalTopic),
timer:sleep(100),
%% PUBLISH a message to the 'remote' broker, as we have only one broker, %% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one. %% the remote broker is also the local one.
wait_for_resource_ready(BridgeIDIngress, 5), wait_for_resource_ready(BridgeIDIngress, 5),
@ -570,6 +569,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
Payload = <<"hello">>, Payload = <<"hello">>,
emqx:subscribe(RemoteTopic), emqx:subscribe(RemoteTopic),
timer:sleep(100),
%% PUBLISH a message to the 'local' broker, as we have only one broker, %% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one. %% the remote broker is also the local one.
wait_for_resource_ready(BridgeIDEgress, 5), wait_for_resource_ready(BridgeIDEgress, 5),
@ -593,6 +593,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
RuleTopic = <<"t/1">>, RuleTopic = <<"t/1">>,
RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
emqx:subscribe(RemoteTopic2), emqx:subscribe(RemoteTopic2),
timer:sleep(100),
wait_for_resource_ready(BridgeIDEgress, 5), wait_for_resource_ready(BridgeIDEgress, 5),
emqx:publish(emqx_message:make(RuleTopic, Payload2)), emqx:publish(emqx_message:make(RuleTopic, Payload2)),
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),

View File

@ -436,6 +436,7 @@ typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, examp
typename_to_spec("bytesize()", _Mod) -> #{type => string, example => <<"32MB">>}; typename_to_spec("bytesize()", _Mod) -> #{type => string, example => <<"32MB">>};
typename_to_spec("wordsize()", _Mod) -> #{type => string, example => <<"1024KB">>}; typename_to_spec("wordsize()", _Mod) -> #{type => string, example => <<"1024KB">>};
typename_to_spec("map()", _Mod) -> #{type => object, example => #{}}; typename_to_spec("map()", _Mod) -> #{type => object, example => #{}};
typename_to_spec("{binary(), binary()}", _Mod) -> #{type => object, example => #{}};
typename_to_spec("comma_separated_list()", _Mod) -> typename_to_spec("comma_separated_list()", _Mod) ->
#{type => string, example => <<"item1,item2">>}; #{type => string, example => <<"item1,item2">>};
typename_to_spec("comma_separated_atoms()", _Mod) -> typename_to_spec("comma_separated_atoms()", _Mod) ->

View File

@ -70,7 +70,7 @@
-define(DEFAULT_IDLE_TIMEOUT, 30000). -define(DEFAULT_IDLE_TIMEOUT, 30000).
-define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}). -define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}).
-define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304, -define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304,
message_queue_len => 32000}). max_message_queue_len => 32000}).
-elvis([{elvis_style, god_modules, disable}]). -elvis([{elvis_style, god_modules, disable}]).

View File

@ -235,7 +235,7 @@ accumulate_metrics(TopicMetricsIn, TopicMetricsAcc) ->
%% @doc MetricsIn :: #{'messages.dropped.rate' :: integer(), ...} %% @doc MetricsIn :: #{'messages.dropped.rate' :: integer(), ...}
do_accumulation_metrics(MetricsIn, undefined) -> MetricsIn; do_accumulation_metrics(MetricsIn, undefined) -> MetricsIn;
do_accumulation_metrics(MetricsIn, MetricsAcc) -> do_accumulation_metrics(MetricsIn, {MetricsAcc, _}) ->
Keys = maps:keys(MetricsIn), Keys = maps:keys(MetricsIn),
lists:foldl(fun(Key, Acc) -> lists:foldl(fun(Key, Acc) ->
InVal = maps:get(Key, MetricsIn), InVal = maps:get(Key, MetricsIn),

View File

@ -22,9 +22,11 @@
%% be used by the prometheus application %% be used by the prometheus application
-behaviour(prometheus_collector). -behaviour(prometheus_collector).
-include("emqx_prometheus.hrl").
-include_lib("prometheus/include/prometheus.hrl"). -include_lib("prometheus/include/prometheus.hrl").
-include_lib("prometheus/include/prometheus_model.hrl"). -include_lib("prometheus/include/prometheus_model.hrl").
-include_lib("emqx/include/logger.hrl").
-import(prometheus_model_helpers, -import(prometheus_model_helpers,
[ create_mf/5 [ create_mf/5
@ -32,6 +34,16 @@
, counter_metric/1 , counter_metric/1
]). ]).
-export([ update/1
, start/0
, stop/0
, restart/0
% for rpc
, do_start/0
, do_stop/0
, do_restart/0
]).
%% APIs %% APIs
-export([start_link/1]). -export([start_link/1]).
@ -58,6 +70,56 @@
-record(state, {push_gateway, timer, interval}). -record(state, {push_gateway, timer, interval}).
%%--------------------------------------------------------------------
%% update new config
update(Config) ->
case emqx_conf:update([prometheus], Config,
#{rawconf_with_defaults => true, override_to => cluster}) of
{ok, #{raw_config := NewConfigRows}} ->
case maps:get(<<"enable">>, Config, true) of
true ->
ok = restart();
false ->
ok = stop()
end,
{ok, NewConfigRows};
{error, Reason} ->
{error, Reason}
end.
start() -> cluster_call(do_start, []).
stop() -> cluster_call(do_stop, []).
restart() -> cluster_call(do_restart, []).
do_start() ->
emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus])).
do_stop() ->
case emqx_prometheus_sup:stop_child(?APP) of
ok ->
ok;
{error, not_found} ->
ok
end.
do_restart() ->
ok = do_stop(),
ok = do_start(),
ok.
cluster_call(F, A) ->
[ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()],
ok.
rpc_call(N, F, A) ->
case rpc:call(N, ?MODULE, F, A, 5000) of
{badrpc, R} ->
?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]),
{error, {badrpc, R}};
Result ->
Result
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -67,16 +67,13 @@ prometheus(get, _Params) ->
{200, emqx:get_raw_config([<<"prometheus">>], #{})}; {200, emqx:get_raw_config([<<"prometheus">>], #{})};
prometheus(put, #{body := Body}) -> prometheus(put, #{body := Body}) ->
{ok, Config} = emqx:update_config([prometheus], Body), case emqx_prometheus:update(Body) of
case maps:get(<<"enable">>, Body) of {ok, NewConfig} ->
true -> {200, NewConfig};
_ = emqx_prometheus_sup:stop_child(?APP), {error, Reason} ->
emqx_prometheus_sup:start_child(?APP, maps:get(config, Config)); Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
false -> {500, 'INTERNAL_ERROR', Message}
_ = emqx_prometheus_sup:stop_child(?APP), end.
ok
end,
{200, emqx:get_raw_config([<<"prometheus">>], #{})}.
stats(get, #{headers := Headers}) -> stats(get, #{headers := Headers}) ->
Type = Type =

View File

@ -37,7 +37,9 @@
, clean/0 , clean/0
, delete/1 , delete/1
, page_read/3 , page_read/3
, post_config_update/5]). , post_config_update/5
, stats_fun/0
]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([ init/1
@ -69,6 +71,7 @@
-callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}.
-callback clear_expired(context()) -> ok. -callback clear_expired(context()) -> ok.
-callback clean(context()) -> ok. -callback clean(context()) -> ok.
-callback size(context()) -> non_neg_integer().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Hook API %% Hook API
@ -185,6 +188,9 @@ post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) ->
call(Req) -> call(Req) ->
gen_server:call(?MODULE, Req, infinity). gen_server:call(?MODULE, Req, infinity).
stats_fun() ->
gen_server:cast(?MODULE, ?FUNCTION_NAME).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -226,6 +232,12 @@ handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(stats_fun, #{context := Context} = State) ->
Mod = get_backend_module(),
Size = Mod:size(Context),
emqx_stats:setstat('retained.count', 'retained.max', Size),
{noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
@ -485,8 +497,11 @@ close_resource(_) ->
load(Context) -> load(Context) ->
_ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Context]}), _ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Context]}),
_ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Context]}), _ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Context]}),
emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0),
ok. ok.
unload() -> unload() ->
emqx:unhook('message.publish', {?MODULE, on_message_publish}), emqx:unhook('message.publish', {?MODULE, on_message_publish}),
emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}). emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}),
emqx_stats:cancel_update(emqx_retainer_stats),
ok.

View File

@ -30,7 +30,9 @@
, page_read/4 , page_read/4
, match_messages/3 , match_messages/3
, clear_expired/1 , clear_expired/1
, clean/1]). , clean/1
, size/1
]).
-export([create_resource/1]). -export([create_resource/1]).
@ -73,7 +75,6 @@ store_retained(_, Msg =#message{topic = Topic}) ->
ExpiryTime = emqx_retainer:get_expiry_time(Msg), ExpiryTime = emqx_retainer:get_expiry_time(Msg),
case is_table_full() of case is_table_full() of
false -> false ->
ok = emqx_metrics:inc('messages.retained'),
mria:dirty_write(?TAB, mria:dirty_write(?TAB,
#retained{topic = topic2tokens(Topic), #retained{topic = topic2tokens(Topic),
msg = Msg, msg = Msg,
@ -155,6 +156,10 @@ match_messages(_, Topic, Cursor) ->
clean(_) -> clean(_) ->
_ = mria:clear_table(?TAB), _ = mria:clear_table(?TAB),
ok. ok.
size(_) ->
table_size().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -12,8 +12,8 @@ emqx_slow_subs {
## The eviction time of the record, which in the statistics record table ## The eviction time of the record, which in the statistics record table
## ##
## Default: 5m ## Default: 300ms
expire_interval = 5m expire_interval = 300ms
## The maximum number of records in the slow subscription statistics record table ## The maximum number of records in the slow subscription statistics record table
## ##

View File

@ -25,6 +25,18 @@
-include("emqx_statsd.hrl"). -include("emqx_statsd.hrl").
-include_lib("emqx/include/logger.hrl").
-export([ update/1
, start/0
, stop/0
, restart/0
%% for rpc
, do_start/0
, do_stop/0
, do_restart/0
]).
%% Interface %% Interface
-export([start_link/1]). -export([start_link/1]).
@ -44,6 +56,52 @@
estatsd_pid :: pid() estatsd_pid :: pid()
}). }).
update(Config) ->
case emqx_conf:update([statsd],
Config,
#{rawconf_with_defaults => true, override_to => cluster}) of
{ok, #{raw_config := NewConfigRows}} ->
ok = stop(),
case maps:get(<<"enable">>, Config, true) of
true ->
ok = start();
false ->
ignore
end,
{ok, NewConfigRows};
{error, Reason} ->
{error, Reason}
end.
start() -> cluster_call(do_start, []).
stop() -> cluster_call(do_stop, []).
restart() -> cluster_call(do_restart, []).
do_start() ->
emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})).
do_stop() ->
emqx_statsd_sup:ensure_child_stopped(?APP).
do_restart() ->
ok = do_stop(),
ok = do_start(),
ok.
cluster_call(F, A) ->
[ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()],
ok.
rpc_call(N, F, A) ->
case rpc:call(N, ?MODULE, F, A, 5000) of
{badrpc, R} ->
?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]),
{error, {badrpc, R}};
Result ->
Result
end.
start_link(Opts) -> start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).

View File

@ -55,17 +55,8 @@ statsd(get, _Params) ->
{200, emqx:get_raw_config([<<"statsd">>], #{})}; {200, emqx:get_raw_config([<<"statsd">>], #{})};
statsd(put, #{body := Body}) -> statsd(put, #{body := Body}) ->
case emqx:update_config([statsd], case emqx_statsd:update(Body) of
Body, {ok, NewConfig} ->
#{rawconf_with_defaults => true, override_to => cluster}) of
{ok, #{raw_config := NewConfig, config := Config}} ->
ok = emqx_statsd_sup:ensure_child_stopped(?APP),
case maps:get(<<"enable">>, Body) of
true ->
ok = emqx_statsd_sup:ensure_child_started(?APP, maps:get(config, Config));
false ->
ok
end,
{200, NewConfig}; {200, NewConfig};
{error, Reason} -> {error, Reason} ->
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),

View File

@ -122,42 +122,42 @@ profiles() ->
, {relx, relx(Vsn, cloud, bin, ce)} , {relx, relx(Vsn, cloud, bin, ce)}
, {overrides, prod_overrides()} , {overrides, prod_overrides()}
, {project_app_dirs, project_app_dirs(ce)} , {project_app_dirs, project_app_dirs(ce)}
, {post_hooks, [{compile, "./build emqx doc"}]} , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx doc"}]}
]} ]}
, {'emqx-pkg', , {'emqx-pkg',
[ {erl_opts, prod_compile_opts()} [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, cloud, pkg, ce)} , {relx, relx(Vsn, cloud, pkg, ce)}
, {overrides, prod_overrides()} , {overrides, prod_overrides()}
, {project_app_dirs, project_app_dirs(ce)} , {project_app_dirs, project_app_dirs(ce)}
, {post_hooks, [{compile, "./build emqx-pkg doc"}]} , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-pkg doc"}]}
]} ]}
, {'emqx-enterprise', , {'emqx-enterprise',
[ {erl_opts, prod_compile_opts()} [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, cloud, bin, ee)} , {relx, relx(Vsn, cloud, bin, ee)}
, {overrides, prod_overrides()} , {overrides, prod_overrides()}
, {project_app_dirs, project_app_dirs(ee)} , {project_app_dirs, project_app_dirs(ee)}
, {post_hooks, [{compile, "./build emqx-enterprise doc"}]} , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-enterprise doc"}]}
]} ]}
, {'emqx-enterprise-pkg', , {'emqx-enterprise-pkg',
[ {erl_opts, prod_compile_opts()} [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, cloud, pkg, ee)} , {relx, relx(Vsn, cloud, pkg, ee)}
, {overrides, prod_overrides()} , {overrides, prod_overrides()}
, {project_app_dirs, project_app_dirs(ee)} , {project_app_dirs, project_app_dirs(ee)}
, {post_hooks, [{compile, "./build emqx-enterprise-pkg doc"}]} , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-enterprise-pkg doc"}]}
]} ]}
, {'emqx-edge', , {'emqx-edge',
[ {erl_opts, prod_compile_opts()} [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, edge, bin, ce)} , {relx, relx(Vsn, edge, bin, ce)}
, {overrides, prod_overrides()} , {overrides, prod_overrides()}
, {project_app_dirs, project_app_dirs(ce)} , {project_app_dirs, project_app_dirs(ce)}
, {post_hooks, [{compile, "./build emqx-edge doc"}]} , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-edge doc"}]}
]} ]}
, {'emqx-edge-pkg', , {'emqx-edge-pkg',
[ {erl_opts, prod_compile_opts()} [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, edge, pkg, ce)} , {relx, relx(Vsn, edge, pkg, ce)}
, {overrides, prod_overrides()} , {overrides, prod_overrides()}
, {project_app_dirs, project_app_dirs(ce)} , {project_app_dirs, project_app_dirs(ce)}
, {post_hooks, [{compile, "./build emqx-edge-pkg doc"}]} , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-edge-pkg doc"}]}
]} ]}
, {check, , {check,
[ {erl_opts, common_compile_opts()} [ {erl_opts, common_compile_opts()}