diff --git a/.github/workflows/run_api_tests.yaml b/.github/workflows/run_api_tests.yaml index 796bcf451..084704537 100644 --- a/.github/workflows/run_api_tests.yaml +++ b/.github/workflows/run_api_tests.yaml @@ -61,7 +61,7 @@ jobs: - uses: actions/checkout@v2 with: repository: emqx/emqx-fvt - ref: 1.0.3-dev1 + ref: 1.0.3-dev2 path: . - uses: actions/setup-java@v1 with: diff --git a/Makefile b/Makefile index 55ceec0ee..0a1fafbdd 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-2:23.3.4.9-3-alpine3 export EMQX_DEFAULT_RUNNER = alpine:3.14 export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) -export EMQX_DASHBOARD_VERSION ?= v0.14.0 +export EMQX_DASHBOARD_VERSION ?= v0.17.0 export DOCKERFILE := deploy/docker/Dockerfile export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing ifeq ($(OS),Windows_NT) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 752b41abf..e4bb4a123 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -24,8 +24,4 @@ %% NOTE: This version number should be manually bumped for each release -%% NOTE: This version number should have 3 numeric parts -%% (Major.Minor.Patch), and extra info can be added after a final -%% hyphen. - --define(EMQX_RELEASE, "5.0.0-beta.2"). +-define(EMQX_RELEASE, "5.0-beta.3"). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl new file mode 100644 index 000000000..a92041b00 --- /dev/null +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_limiter_correction). + +%% API +-export([ add/2 ]). + +-type correction_value() :: #{ correction := emqx_limiter_decimal:zero_or_float() + , any() => any() + }. + +-export_type([correction_value/0]). + +%%-------------------------------------------------------------------- +%%% API +%%-------------------------------------------------------------------- +-spec add(number(), correction_value()) -> {integer(), correction_value()}. +add(Inc, #{correction := Correction} = Data) -> + FixedInc = Inc + Correction, + IntInc = erlang:floor(FixedInc), + {IntInc, Data#{correction := FixedInc - IntInc}}. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl index f0f452b0e..6c6016c06 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl @@ -20,7 +20,7 @@ %% API -export([ add/2, sub/2, mul/2 - , add_to_counter/3, put_to_counter/3, floor_div/2]). + , put_to_counter/3, floor_div/2]). -export_type([decimal/0, zero_or_float/0]). -type decimal() :: infinity | number(). @@ -60,22 +60,6 @@ floor_div(infinity, _) -> floor_div(A, B) -> erlang:floor(A / B). --spec add_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> - {zero_or_float(), zero_or_float()}. -add_to_counter(_, _, infinity) -> - {0, 0}; -add_to_counter(Counter, Index, Val) when is_float(Val) -> - IntPart = erlang:floor(Val), - if IntPart > 0 -> - counters:add(Counter, Index, IntPart); - true -> - ok - end, - {IntPart, Val - IntPart}; -add_to_counter(Counter, Index, Val) -> - counters:add(Counter, Index, Val), - {Val, 0}. - -spec put_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> ok. put_to_counter(_, _, infinity) -> ok; diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 9704c9e50..f683ef68e 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -87,7 +87,7 @@ -define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter -export_type([index/0]). --import(emqx_limiter_decimal, [add/2, sub/2, mul/2, add_to_counter/3, put_to_counter/3]). +-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). %%-------------------------------------------------------------------- %% API @@ -317,12 +317,11 @@ longitudinal(#{id := Id, longitudinal(#{id := Id, rate := Rate, capacity := Capacity, - correction := Correction, counter := Counter, index := Index, obtained := Obtained} = Node, InFlow, Nodes) when Counter =/= undefined -> - Flow = add(erlang:min(InFlow, Rate), Correction), + Flow = erlang:min(InFlow, Rate), ShouldAlloc = case counters:get(Counter, Index) of @@ -340,11 +339,11 @@ longitudinal(#{id := Id, Avaiable when Avaiable > 0 -> %% XXX if capacity is infinity, and flow always > 0, the value in counter %% will be overflow at some point in the future, do we need to deal with this situation??? - {Alloced, Decimal} = add_to_counter(Counter, Index, Avaiable), + {Inc, Node2} = emqx_limiter_correction:add(Avaiable, Node), + counters:add(Counter, Index, Inc), - {Alloced, - Nodes#{Id := Node#{obtained := Obtained + Alloced, - correction := Decimal}}}; + {Inc, + Nodes#{Id := Node2#{obtained := Obtained + Inc}}}; _ -> {0, Nodes} end; @@ -411,31 +410,38 @@ dispatch_burst([], State) -> dispatch_burst(GroupL, #{root := #{burst := Burst}, nodes := Nodes} = State) -> - InFlow = erlang:floor(Burst / erlang:length(GroupL)), + InFlow = Burst / erlang:length(GroupL), Dispatch = fun({Zone, Childs}, NodeAcc) -> - #{id := ZoneId, - burst := ZoneBurst, - obtained := Obtained} = Zone, + #{id := ZoneId, + burst := ZoneBurst, + obtained := Obtained} = Zone, - ZoneFlow = erlang:min(InFlow, ZoneBurst), - EachFlow = ZoneFlow div erlang:length(Childs), - Zone2 = Zone#{obtained := Obtained + ZoneFlow}, - NodeAcc2 = NodeAcc#{ZoneId := Zone2}, - dispatch_burst_to_buckets(Childs, EachFlow, NodeAcc2) + case erlang:min(InFlow, ZoneBurst) of + 0 -> NodeAcc; + ZoneFlow -> + EachFlow = ZoneFlow / erlang:length(Childs), + {Alloced, NodeAcc2} = dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc), + Zone2 = Zone#{obtained := Obtained + Alloced}, + NodeAcc2#{ZoneId := Zone2} + end end, State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}. -spec dispatch_burst_to_buckets(list(node_id()), - non_neg_integer(), nodes()) -> nodes(). -dispatch_burst_to_buckets(Childs, InFlow, Nodes) -> - Each = fun(ChildId, NodeAcc) -> - #{counter := Counter, - index := Index, - obtained := Obtained} = Bucket = maps:get(ChildId, NodeAcc), - counters:add(Counter, Index, InFlow), - NodeAcc#{ChildId := Bucket#{obtained := Obtained + InFlow}} - end, - lists:foldl(Each, Nodes, Childs). + float(), non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}. +dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) -> + #{counter := Counter, + index := Index, + obtained := Obtained} = Bucket = maps:get(ChildId, Nodes), + {Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket), + + counters:add(Counter, Index, Inc), + + Nodes2 = Nodes#{ChildId := Bucket2#{obtained := Obtained + Inc}}, + dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Nodes2); + +dispatch_burst_to_buckets([], _, Alloced, Nodes) -> + {Alloced, Nodes}. -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). init_tree(Type, State) -> diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index 4665e2557..70e660f28 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -146,7 +146,6 @@ {counter, 'messages.dropped.expired'}, % QoS2 Messages expired {counter, 'messages.dropped.no_subscribers'}, % Messages dropped {counter, 'messages.forward'}, % Messages forward - {counter, 'messages.retained'}, % Messages retained {counter, 'messages.delayed'}, % Messages delayed {counter, 'messages.delivered'}, % Messages delivered {counter, 'messages.acked'} % Messages acked @@ -207,7 +206,7 @@ stop() -> gen_server:stop(?SERVER). %% BACKW: v4.3.0 upgrade_retained_delayed_counter_type() -> - Ks = ['messages.retained', 'messages.delayed'], + Ks = ['messages.delayed'], gen_server:call(?SERVER, {set_type_to_counter, Ks}, infinity). %%-------------------------------------------------------------------- @@ -556,7 +555,7 @@ reserved_idx('messages.dropped') -> 109; reserved_idx('messages.dropped.expired') -> 110; reserved_idx('messages.dropped.no_subscribers') -> 111; reserved_idx('messages.forward') -> 112; -reserved_idx('messages.retained') -> 113; +%%reserved_idx('messages.retained') -> 113; %% keep the index, new metrics can use this reserved_idx('messages.delayed') -> 114; reserved_idx('messages.delivered') -> 115; reserved_idx('messages.acked') -> 116; @@ -592,4 +591,3 @@ reserved_idx('olp.gc') -> 303; reserved_idx('olp.new_conn') -> 304; reserved_idx(_) -> undefined. - diff --git a/apps/emqx/test/emqx_metrics_SUITE.erl b/apps/emqx/test/emqx_metrics_SUITE.erl index bafbf7f38..95f9a0002 100644 --- a/apps/emqx/test/emqx_metrics_SUITE.erl +++ b/apps/emqx/test/emqx_metrics_SUITE.erl @@ -71,19 +71,10 @@ t_inc_dec(_) -> with_metrics_server( fun() -> ?assertEqual(0, emqx_metrics:val('bytes.received')), - ?assertEqual(0, emqx_metrics:val('messages.retained')), ok = emqx_metrics:inc('bytes.received'), ok = emqx_metrics:inc('bytes.received', 2), ok = emqx_metrics:inc('bytes.received', 2), - ?assertEqual(5, emqx_metrics:val('bytes.received')), - ok = emqx_metrics:inc('messages.retained', 2), - ok = emqx_metrics:inc('messages.retained', 2), - ?assertEqual(4, emqx_metrics:val('messages.retained')), - ok = emqx_metrics:dec('messages.retained'), - ok = emqx_metrics:dec('messages.retained', 1), - ?assertEqual(2, emqx_metrics:val('messages.retained')), - ok = emqx_metrics:set('messages.retained', 3), - ?assertEqual(3, emqx_metrics:val('messages.retained')) + ?assertEqual(5, emqx_metrics:val('bytes.received')) end). t_inc_recv(_) -> @@ -162,21 +153,12 @@ t_trans(_) -> ok = emqx_metrics:trans(inc, 'bytes.received'), ok = emqx_metrics:trans(inc, 'bytes.received', 2), ?assertEqual(0, emqx_metrics:val('bytes.received')), - ok = emqx_metrics:trans(inc, 'messages.retained', 2), - ok = emqx_metrics:trans(inc, 'messages.retained', 2), - ?assertEqual(0, emqx_metrics:val('messages.retained')), ok = emqx_metrics:commit(), ?assertEqual(3, emqx_metrics:val('bytes.received')), - ?assertEqual(4, emqx_metrics:val('messages.retained')), - ok = emqx_metrics:trans(dec, 'messages.retained'), - ok = emqx_metrics:trans(dec, 'messages.retained', 1), - ?assertEqual(4, emqx_metrics:val('messages.retained')), - ok = emqx_metrics:commit(), - ?assertEqual(2, emqx_metrics:val('messages.retained')) + ok = emqx_metrics:commit() end). with_metrics_server(Fun) -> {ok, _} = emqx_metrics:start_link(), _ = Fun(), ok = emqx_metrics:stop(). - diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 8ce1790d2..55acf004c 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -288,7 +288,7 @@ systopic_metrics() -> <<"messages/qos2/received">>, <<"messages/qos2/sent">>, <<"messages/publish">>, <<"messages/dropped">>, <<"messages/dropped/expired">>, <<"messages/dropped/no_subscribers">>, - <<"messages/forward">>, <<"messages/retained">>, + <<"messages/forward">>, <<"messages/delayed">>, <<"messages/delivered">>, <<"messages/acked">>], ?LET({Nodename, T}, diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index c839a1a13..6eb06efc1 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -19,6 +19,7 @@ -include("emqx_authn.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("typerefl/include/types.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). -behaviour(hocon_schema). -behaviour(emqx_authentication). @@ -77,7 +78,7 @@ validations() -> ]. url(type) -> binary(); -url(validator) -> [fun check_url/1]; +url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")]; url(nullable) -> false; url(_) -> undefined. @@ -118,16 +119,16 @@ create(_AuthenticatorID, Config) -> create(Config). create(#{method := Method, - url := URL, + url := RawURL, headers := Headers, body := Body, request_timeout := RequestTimeout} = Config) -> - #{path := Path, - query := Query} = URIMap = parse_url(URL), + {BsaeUrlWithPath, Query} = parse_fullpath(RawURL), + URIMap = parse_url(BsaeUrlWithPath), ResourceId = emqx_authn_utils:make_resource_id(?MODULE), State = #{method => Method, - path => Path, - base_query => cow_qs:parse_qs(list_to_binary(Query)), + path => maps:get(path, URIMap), + base_query => cow_qs:parse_qs(to_bin(Query)), headers => maps:to_list(Headers), body => maps:to_list(Body), request_timeout => RequestTimeout, @@ -204,11 +205,8 @@ destroy(#{resource_id := ResourceId}) -> %% Internal functions %%-------------------------------------------------------------------- -check_url(URL) -> - case emqx_http_lib:uri_parse(URL) of - {ok, _} -> true; - {error, _} -> false - end. +parse_fullpath(RawURL) -> + cow_http:parse_fullpath(to_bin(RawURL)). check_body(Body) -> lists:all( @@ -234,7 +232,8 @@ transform_header_name(Headers) -> end, #{}, Headers). check_ssl_opts(Conf) -> - case parse_url(get_conf_val("url", Conf)) of + {BaseUrlWithPath, _Query} = parse_fullpath(get_conf_val("url", Conf)), + case parse_url(BaseUrlWithPath) of #{scheme := https} -> case get_conf_val("ssl.enable", Conf) of true -> ok; @@ -264,12 +263,13 @@ generate_request(Credential, #{method := Method, headers := Headers, body := Body0}) -> Body = replace_placeholders(Body0, Credential), + NBaseQuery = replace_placeholders(BaseQuery, Credential), case Method of get -> - NPath = append_query(Path, BaseQuery ++ Body), + NPath = append_query(Path, NBaseQuery ++ Body), {NPath, Headers}; post -> - NPath = append_query(Path, BaseQuery), + NPath = append_query(Path, NBaseQuery), ContentType = proplists:get_value(<<"content-type">>, Headers), NBody = serialize_body(ContentType, Body), {NPath, Headers, NBody} diff --git a/apps/emqx_authz/src/emqx_authz_api_sources.erl b/apps/emqx_authz/src/emqx_authz_api_sources.erl index 1fd225024..53a1fec5f 100644 --- a/apps/emqx_authz/src/emqx_authz_api_sources.erl +++ b/apps/emqx_authz/src/emqx_authz_api_sources.erl @@ -412,9 +412,22 @@ get_raw_sources() -> RawSources = emqx:get_raw_config([authorization, sources], []), Schema = #{roots => emqx_authz_schema:fields("authorization"), fields => #{}}, Conf = #{<<"sources">> => RawSources}, - #{<<"sources">> := Sources} = hocon_schema:check_plain(Schema, Conf, - #{only_fill_defaults => true}), - Sources. + #{<<"sources">> := Sources} = hocon_schema:check_plain(Schema, Conf, #{only_fill_defaults => true}), + merge_default_headers(Sources). + +merge_default_headers(Sources) -> + lists:map(fun(Source) -> + Convert = + case Source of + #{<<"method">> := <<"get">>} -> + emqx_authz_schema:headers_no_content_type(converter); + #{<<"method">> := <<"post">>} -> + emqx_authz_schema:headers(converter); + _ -> fun(H) -> H end + end, + Headers = Convert(maps:get(<<"headers">>, Source, #{})), + Source#{<<"headers">> => Headers} + end, Sources). get_raw_source(Type) -> lists:filter(fun (#{<<"type">> := T}) -> diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl index f6642f245..e497aae19 100644 --- a/apps/emqx_authz/src/emqx_authz_http.erl +++ b/apps/emqx_authz/src/emqx_authz_http.erl @@ -29,6 +29,7 @@ , destroy/1 , dry_run/1 , authorize/4 + , parse_url/1 ]). -ifdef(TEST). @@ -39,62 +40,29 @@ description() -> "AuthZ with http". -init(Source) -> - case emqx_authz_utils:create_resource(emqx_connector_http, Source) of +init(Config) -> + NConfig = parse_config(Config), + case emqx_authz_utils:create_resource(emqx_connector_http, NConfig) of {error, Reason} -> error({load_config_error, Reason}); - {ok, Id} -> Source#{annotations => #{id => Id}} + {ok, Id} -> NConfig#{annotations => #{id => Id}} end. destroy(#{annotations := #{id := Id}}) -> ok = emqx_resource:remove_local(Id). -dry_run(Source) -> - emqx_resource:create_dry_run_local(emqx_connector_http, Source). +dry_run(Config) -> + emqx_resource:create_dry_run_local(emqx_connector_http, parse_config(Config)). -authorize(Client, PubSub, Topic, - #{type := http, - query := Query, - path := Path, - headers := Headers, - method := Method, - request_timeout := RequestTimeout, - annotations := #{id := ResourceID} - } = Source) -> - Request = case Method of - get -> - Path1 = replvar( - Path ++ "?" ++ Query, - PubSub, - Topic, - maps:to_list(Client), - fun var_uri_encode/1), - - {Path1, maps:to_list(Headers)}; - - _ -> - Body0 = maps:get(body, Source, #{}), - Body1 = replvar_deep( - Body0, - PubSub, - Topic, - maps:to_list(Client), - fun var_bin_encode/1), - - Body2 = serialize_body( - maps:get(<<"content-type">>, Headers, <<"application/json">>), - Body1), - - Path1 = replvar( - Path, - PubSub, - Topic, - maps:to_list(Client), - fun var_uri_encode/1), - - {Path1, maps:to_list(Headers), Body2} - end, - HttpResult = emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}), - case HttpResult of +authorize( Client + , PubSub + , Topic + , #{ type := http + , annotations := #{id := ResourceID} + , method := Method + , request_timeout := RequestTimeout + } = Config) -> + Request = generate_request(PubSub, Topic, Client, Config), + case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of {ok, 200, _Headers} -> {matched, allow}; {ok, 204, _Headers} -> @@ -112,12 +80,75 @@ authorize(Client, PubSub, Topic, ignore end. +parse_config(#{ url := URL + , method := Method + , headers := Headers + , request_timeout := ReqTimeout + } = Conf) -> + {BaseURLWithPath, Query} = parse_fullpath(URL), + BaseURLMap = parse_url(BaseURLWithPath), + Conf#{ method => Method + , base_url => maps:remove(query, BaseURLMap) + , base_query => cow_qs:parse_qs(bin(Query)) + , body => maps:get(body, Conf, #{}) + , headers => Headers + , request_timeout => ReqTimeout + }. + +parse_fullpath(RawURL) -> + cow_http:parse_fullpath(bin(RawURL)). + +parse_url(URL) + when URL =:= undefined -> + #{}; +parse_url(URL) -> + {ok, URIMap} = emqx_http_lib:uri_parse(URL), + case maps:get(query, URIMap, undefined) of + undefined -> + URIMap#{query => ""}; + _ -> + URIMap + end. + +generate_request( PubSub + , Topic + , Client + , #{ method := Method + , base_url := #{path := Path} + , base_query := BaseQuery + , headers := Headers + , body := Body0 + }) -> + Body = replace_placeholders(maps:to_list(Body0), PubSub, Topic, Client), + NBaseQuery = replace_placeholders(BaseQuery, PubSub, Topic, Client), + case Method of + get -> + NPath = append_query(Path, NBaseQuery ++ Body), + {NPath, Headers}; + _ -> + NPath = append_query(Path, NBaseQuery), + NBody = serialize_body( + proplists:get_value(<<"Accept">>, Headers, <<"application/json">>), + Body + ), + {NPath, Headers, NBody} + end. + +append_query(Path, []) -> + Path; +append_query(Path, Query) -> + Path ++ "?" ++ binary_to_list(query_string(Query)). + query_string(Body) -> - query_string(maps:to_list(Body), []). + query_string(Body, []). query_string([], Acc) -> - <<$&, Str/binary>> = iolist_to_binary(lists:reverse(Acc)), - Str; + case iolist_to_binary(lists:reverse(Acc)) of + <<$&, Str/binary>> -> + Str; + <<>> -> + <<>> + end; query_string([{K, V} | More], Acc) -> query_string( More , [ ["&", emqx_http_lib:uri_encode(K), "=", emqx_http_lib:uri_encode(V)] @@ -128,67 +159,36 @@ serialize_body(<<"application/json">>, Body) -> serialize_body(<<"application/x-www-form-urlencoded">>, Body) -> query_string(Body). +replace_placeholders(KVs, PubSub, Topic, Client) -> + replace_placeholders(KVs, PubSub, Topic, Client, []). -replvar_deep(Map, PubSub, Topic, Vars, VarEncode) when is_map(Map) -> - maps:from_list( - lists:map( - fun({Key, Value}) -> - {replvar(Key, PubSub, Topic, Vars, VarEncode), - replvar_deep(Value, PubSub, Topic, Vars, VarEncode)} - end, - maps:to_list(Map))); -replvar_deep(List, PubSub, Topic, Vars, VarEncode) when is_list(List) -> - lists:map( - fun(Value) -> - replvar_deep(Value, PubSub, Topic, Vars, VarEncode) - end, - List); -replvar_deep(Number, _PubSub, _Topic, _Vars, _VarEncode) when is_number(Number) -> - Number; -replvar_deep(Binary, PubSub, Topic, Vars, VarEncode) when is_binary(Binary) -> - replvar(Binary, PubSub, Topic, Vars, VarEncode). +replace_placeholders([], _PubSub, _Topic, _Client, Acc) -> + lists:reverse(Acc); +replace_placeholders([{K, V0} | More], PubSub, Topic, Client, Acc) -> + case replace_placeholder(V0, PubSub, Topic, Client) of + undefined -> + error({cannot_get_variable, V0}); + V -> + replace_placeholders(More, PubSub, Topic, Client, [{bin(K), bin(V)} | Acc]) + end. -replvar(Str0, PubSub, Topic, [], VarEncode) -> - NTopic = emqx_http_lib:uri_encode(Topic), - Str1 = re:replace(Str0, emqx_authz:ph_to_re(?PH_S_TOPIC), - VarEncode(NTopic), [global, {return, binary}]), - re:replace(Str1, emqx_authz:ph_to_re(?PH_S_ACTION), - VarEncode(PubSub), [global, {return, binary}]); +replace_placeholder(?PH_USERNAME, _PubSub, _Topic, Client) -> + bin(maps:get(username, Client, undefined)); +replace_placeholder(?PH_CLIENTID, _PubSub, _Topic, Client) -> + bin(maps:get(clientid, Client, undefined)); +replace_placeholder(?PH_HOST, _PubSub, _Topic, Client) -> + inet_parse:ntoa(maps:get(peerhost, Client, undefined)); +replace_placeholder(?PH_PROTONAME, _PubSub, _Topic, Client) -> + bin(maps:get(protocol, Client, undefined)); +replace_placeholder(?PH_MOUNTPOINT, _PubSub, _Topic, Client) -> + bin(maps:get(mountpoint, Client, undefined)); +replace_placeholder(?PH_TOPIC, _PubSub, Topic, _Client) -> + bin(emqx_http_lib:uri_encode(Topic)); +replace_placeholder(?PH_ACTION, PubSub, _Topic, _Client) -> + bin(PubSub); - -replvar(Str, PubSub, Topic, [{username, Username} | Rest], VarEncode) -> - Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_USERNAME), - VarEncode(Username), [global, {return, binary}]), - replvar(Str1, PubSub, Topic, Rest, VarEncode); - -replvar(Str, PubSub, Topic, [{clientid, Clientid} | Rest], VarEncode) -> - Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_CLIENTID), - VarEncode(Clientid), [global, {return, binary}]), - replvar(Str1, PubSub, Topic, Rest, VarEncode); - -replvar(Str, PubSub, Topic, [{peerhost, IpAddress} | Rest], VarEncode) -> - Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_PEERHOST), - VarEncode(inet_parse:ntoa(IpAddress)), [global, {return, binary}]), - replvar(Str1, PubSub, Topic, Rest, VarEncode); - -replvar(Str, PubSub, Topic, [{protocol, Protocol} | Rest], VarEncode) -> - Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_PROTONAME), - VarEncode(Protocol), [global, {return, binary}]), - replvar(Str1, PubSub, Topic, Rest, VarEncode); - -replvar(Str, PubSub, Topic, [{mountpoint, Mountpoint} | Rest], VarEncode) -> - Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_MOUNTPOINT), - VarEncode(Mountpoint), [global, {return, binary}]), - replvar(Str1, PubSub, Topic, Rest, VarEncode); - -replvar(Str, PubSub, Topic, [_Unknown | Rest], VarEncode) -> - replvar(Str, PubSub, Topic, Rest, VarEncode). - -var_uri_encode(S) -> - emqx_http_lib:uri_encode(bin(S)). - -var_bin_encode(S) -> - bin(S). +replace_placeholder(Constant, _, _, _) -> + Constant. bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(B) when is_binary(B) -> B; diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index 58babf3af..421747983 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -17,6 +17,7 @@ -module(emqx_authz_schema). -include_lib("typerefl/include/types.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). -reflect_type([ permission/0 , action/0 @@ -28,9 +29,15 @@ -export([ namespace/0 , roots/0 , fields/1 + , validations/0 + ]). + +-export([ headers_no_content_type/1 + , headers/1 ]). -import(emqx_schema, [mk_duration/2]). +-include_lib("hocon/include/hoconsc.hrl"). %%-------------------------------------------------------------------- %% Hocon Schema @@ -138,11 +145,10 @@ fields(redis_cluster) -> http_common_fields() -> [ {type, #{type => http}} , {enable, #{type => boolean(), default => true}} + , {url, fun url/1} , {request_timeout, mk_duration("request timeout", #{default => "30s"})} , {body, #{type => map(), nullable => true}} - , {path, #{type => string(), default => ""}} - , {query, #{type => string(), default => ""}} - ] ++ emqx_connector_http:fields(config). + ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)). mongo_common_fields() -> [ {collection, #{type => atom()}} @@ -152,22 +158,32 @@ mongo_common_fields() -> default => true}} ]. -headers(type) -> map(); +validations() -> + [ {check_ssl_opts, fun check_ssl_opts/1} + , {check_headers, fun check_headers/1} + ]. + +headers(type) -> list({binary(), binary()}); headers(converter) -> fun(Headers) -> - maps:merge(default_headers(), transform_header_name(Headers)) + maps:to_list(maps:merge(default_headers(), transform_header_name(Headers))) end; headers(default) -> default_headers(); headers(_) -> undefined. -headers_no_content_type(type) -> map(); +headers_no_content_type(type) -> list({binary(), binary()}); headers_no_content_type(converter) -> fun(Headers) -> - maps:merge(default_headers_no_content_type(), transform_header_name(Headers)) + maps:to_list(maps:merge(default_headers_no_content_type(), transform_header_name(Headers))) end; headers_no_content_type(default) -> default_headers_no_content_type(); headers_no_content_type(_) -> undefined. +url(type) -> binary(); +url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")]; +url(nullable) -> false; +url(_) -> undefined. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -190,6 +206,28 @@ transform_header_name(Headers) -> maps:put(K, V, Acc) end, #{}, Headers). +check_ssl_opts(Conf) + when Conf =:= #{} -> + true; +check_ssl_opts(Conf) -> + case emqx_authz_http:parse_url(hocon_schema:get_value("config.url", Conf)) of + #{scheme := https} -> + case hocon_schema:get_value("config.ssl.enable", Conf) of + true -> ok; + false -> false + end; + #{scheme := http} -> + ok + end. + +check_headers(Conf) + when Conf =:= #{} -> + true; +check_headers(Conf) -> + Method = to_bin(hocon_schema:get_value("config.method", Conf)), + Headers = hocon_schema:get_value("config.headers", Conf), + Method =:= <<"post">> orelse (not lists:member(<<"content-type">>, Headers)). + union_array(Item) when is_list(Item) -> hoconsc:array(hoconsc:union(Item)). @@ -225,3 +263,9 @@ to_list(A) when is_atom(A) -> to_list(B) when is_binary(B) -> binary_to_list(B). +to_bin(A) when is_atom(A) -> + atom_to_binary(A); +to_bin(B) when is_binary(B) -> + B; +to_bin(L) when is_list(L) -> + list_to_binary(L). diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index 6612eb2c0..17c9ca519 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -64,9 +64,7 @@ set_special_configs(_App) -> -define(SOURCE1, #{<<"type">> => <<"http">>, <<"enable">> => true, - <<"base_url">> => <<"https://example.com:443/">>, - <<"path">> => <<"a/b">>, - <<"query">> => <<"c=d">>, + <<"url">> => <<"https://example.com:443/a/b?c=d">>, <<"headers">> => #{}, <<"method">> => <<"get">>, <<"request_timeout">> => 5000 diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl index 71dc52a53..614ecda19 100644 --- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl @@ -30,9 +30,7 @@ -define(SOURCE1, #{<<"type">> => <<"http">>, <<"enable">> => true, - <<"base_url">> => <<"https://fake.com:443/">>, - <<"path">> => <<"foo">>, - <<"query">> => <<"a=b">>, + <<"url">> => <<"https://fake.com:443/acl?username=", ?PH_USERNAME/binary>>, <<"headers">> => #{}, <<"method">> => <<"get">>, <<"request_timeout">> => <<"5s">> diff --git a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl index f1ccb9441..40c5f15ba 100644 --- a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl @@ -156,49 +156,14 @@ t_query_params(_Config) -> Req = cowboy_req:reply(200, Req0), {ok, Req, State} end, - #{<<"query">> => <<"username=${username}&" - "clientid=${clientid}&" - "peerhost=${peerhost}&" - "proto_name=${proto_name}&" - "mountpoint=${mountpoint}&" - "topic=${topic}&" - "action=${action}">> - }), - - ClientInfo = #{clientid => <<"client id">>, - username => <<"user name">>, - peerhost => {127,0,0,1}, - protocol => <<"MQTT">>, - mountpoint => <<"MOUNTPOINT">>, - zone => default, - listener => {tcp, default} - }, - - ?assertEqual( - allow, - emqx_access_control:authorize(ClientInfo, publish, <<"t">>)). - -t_path_params(_Config) -> - ok = setup_handler_and_config( - fun(Req0, State) -> - <<"/authz/" - "username/user%20name/" - "clientid/client%20id/" - "peerhost/127.0.0.1/" - "proto_name/MQTT/" - "mountpoint/MOUNTPOINT/" - "topic/t/" - "action/publish">> = cowboy_req:path(Req0), - Req = cowboy_req:reply(200, Req0), - {ok, Req, State} - end, - #{<<"path">> => <<"username/${username}/" - "clientid/${clientid}/" - "peerhost/${peerhost}/" - "proto_name/${proto_name}/" - "mountpoint/${mountpoint}/" - "topic/${topic}/" - "action/${action}">> + #{<<"url">> => <<"http://127.0.0.1:33333/authz/users/?" + "username=${username}&" + "clientid=${clientid}&" + "peerhost=${host}&" + "proto_name=${proto_name}&" + "mountpoint=${mountpoint}&" + "topic=${topic}&" + "action=${action}">> }), ClientInfo = #{clientid => <<"client id">>, @@ -218,23 +183,16 @@ t_json_body(_Config) -> ok = setup_handler_and_config( fun(Req0, State) -> ?assertEqual( - <<"/authz/" - "username/user%20name/" - "clientid/client%20id/" - "peerhost/127.0.0.1/" - "proto_name/MQTT/" - "mountpoint/MOUNTPOINT/" - "topic/t/" - "action/publish">>, + <<"/authz/users/">>, cowboy_req:path(Req0)), {ok, RawBody, Req1} = cowboy_req:read_body(Req0), ?assertMatch( #{<<"username">> := <<"user name">>, - <<"CLIENT_client id">> := <<"client id">>, - <<"peerhost">> := [<<"127.0.0.1">>, 1], - <<"proto_name">> := #{<<"proto">> := <<"MQTT">>}, + <<"CLIENT">> := <<"client id">>, + <<"peerhost">> := <<"127.0.0.1">>, + <<"proto_name">> := <<"MQTT">>, <<"mountpoint">> := <<"MOUNTPOINT">>, <<"topic">> := <<"t">>, <<"action">> := <<"publish">>}, @@ -244,17 +202,10 @@ t_json_body(_Config) -> {ok, Req, State} end, #{<<"method">> => <<"post">>, - <<"path">> => <<"username/${username}/" - "clientid/${clientid}/" - "peerhost/${peerhost}/" - "proto_name/${proto_name}/" - "mountpoint/${mountpoint}/" - "topic/${topic}/" - "action/${action}">>, <<"body">> => #{<<"username">> => <<"${username}">>, - <<"CLIENT_${clientid}">> => <<"${clientid}">>, - <<"peerhost">> => [<<"${peerhost}">>, 1], - <<"proto_name">> => #{<<"proto">> => <<"${proto_name}">>}, + <<"CLIENT">> => <<"${clientid}">>, + <<"peerhost">> => <<"${host}">>, + <<"proto_name">> => <<"${proto_name}">>, <<"mountpoint">> => <<"${mountpoint}">>, <<"topic">> => <<"${topic}">>, <<"action">> => <<"${action}">>} @@ -278,17 +229,10 @@ t_form_body(_Config) -> ok = setup_handler_and_config( fun(Req0, State) -> ?assertEqual( - <<"/authz/" - "username/user%20name/" - "clientid/client%20id/" - "peerhost/127.0.0.1/" - "proto_name/MQTT/" - "mountpoint/MOUNTPOINT/" - "topic/t/" - "action/publish">>, + <<"/authz/users/">>, cowboy_req:path(Req0)), - - {ok, PostVars, Req1} = cowboy_req:read_urlencoded_body(Req0), + + {ok, [{PostVars, true}], Req1} = cowboy_req:read_urlencoded_body(Req0), ?assertMatch( #{<<"username">> := <<"user name">>, @@ -298,22 +242,15 @@ t_form_body(_Config) -> <<"mountpoint">> := <<"MOUNTPOINT">>, <<"topic">> := <<"t">>, <<"action">> := <<"publish">>}, - maps:from_list(PostVars)), + jiffy:decode(PostVars, [return_maps])), Req = cowboy_req:reply(200, Req1), {ok, Req, State} end, #{<<"method">> => <<"post">>, - <<"path">> => <<"username/${username}/" - "clientid/${clientid}/" - "peerhost/${peerhost}/" - "proto_name/${proto_name}/" - "mountpoint/${mountpoint}/" - "topic/${topic}/" - "action/${action}">>, <<"body">> => #{<<"username">> => <<"${username}">>, <<"clientid">> => <<"${clientid}">>, - <<"peerhost">> => <<"${peerhost}">>, + <<"peerhost">> => <<"${host}">>, <<"proto_name">> => <<"${proto_name}">>, <<"mountpoint">> => <<"${mountpoint}">>, <<"topic">> => <<"${topic}">>, @@ -349,7 +286,8 @@ t_create_replace(_Config) -> Req = cowboy_req:reply(200, Req0), {ok, Req, State} end, - #{<<"base_url">> => <<"http://127.0.0.1:33333/authz">>}), + #{<<"url">> => + <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>}), ?assertEqual( allow, @@ -358,7 +296,8 @@ t_create_replace(_Config) -> %% Changing to other bad config does not work BadConfig = maps:merge( raw_http_authz_config(), - #{<<"base_url">> => <<"http://127.0.0.1:33332/authz">>}), + #{<<"url">> => + <<"http://127.0.0.1:33332/authz/users/?topic=${topic}&action=${action}">>}), ?assertMatch( {error, _}, @@ -371,7 +310,8 @@ t_create_replace(_Config) -> %% Changing to valid config OkConfig = maps:merge( raw_http_authz_config(), - #{<<"base_url">> => <<"http://127.0.0.1:33333/authz">>}), + #{<<"url">> => + <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>}), ?assertMatch( {ok, _}, @@ -388,12 +328,9 @@ t_create_replace(_Config) -> raw_http_authz_config() -> #{ <<"enable">> => <<"true">>, - <<"type">> => <<"http">>, <<"method">> => <<"get">>, - <<"base_url">> => <<"http://127.0.0.1:33333/authz">>, - <<"path">> => <<"users/${username}/">>, - <<"query">> => <<"topic=${topic}&action=${action}">>, + <<"url">> => <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>, <<"headers">> => #{<<"X-Test-Header">> => <<"Test Value">>} }. diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 6d596fa67..37f7b4321 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -262,11 +262,20 @@ preprocess_request(#{ , request_timeout => maps:get(request_timeout, Req, 30000) }. -preproc_headers(Headers) -> +preproc_headers(Headers) when is_map(Headers) -> maps:fold(fun(K, V, Acc) -> - Acc#{emqx_plugin_libs_rule:preproc_tmpl(bin(K)) => - emqx_plugin_libs_rule:preproc_tmpl(bin(V))} - end, #{}, Headers). + [{ + emqx_plugin_libs_rule:preproc_tmpl(bin(K)), + emqx_plugin_libs_rule:preproc_tmpl(bin(V)) + } | Acc] + end, [], Headers); +preproc_headers(Headers) when is_list(Headers) -> + lists:map(fun({K, V}) -> + { + emqx_plugin_libs_rule:preproc_tmpl(bin(K)), + emqx_plugin_libs_rule:preproc_tmpl(bin(V)) + } + end, Headers). process_request(#{ method := MethodTks, @@ -278,7 +287,7 @@ process_request(#{ Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)) , path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg) , body => process_request_body(BodyTks, Msg) - , headers => maps:to_list(proc_headers(HeadersTks, Msg)) + , headers => proc_headers(HeadersTks, Msg) , request_timeout => ReqTimeout }. @@ -288,10 +297,12 @@ process_request_body(BodyTks, Msg) -> emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg). proc_headers(HeaderTks, Msg) -> - maps:fold(fun(K, V, Acc) -> - Acc#{emqx_plugin_libs_rule:proc_tmpl(K, Msg) => - emqx_plugin_libs_rule:proc_tmpl(V, Msg)} - end, #{}, HeaderTks). + lists:map(fun({K, V}) -> + { + emqx_plugin_libs_rule:proc_tmpl(K, Msg), + emqx_plugin_libs_rule:proc_tmpl(V, Msg) + } + end, HeaderTks). make_method(M) when M == <<"POST">>; M == <<"post">> -> post; make_method(M) when M == <<"PUT">>; M == <<"put">> -> put; diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 129f00d02..2819cbbc0 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -221,27 +221,26 @@ t_mqtt_conn_bridge_ingress(_) -> %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now + timer:sleep(50), {ok, 201, Bridge} = request(post, uri(["bridges"]), ?MQTT_BRIDGE_INGRESS(ConnctorID)#{ <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_INGRESS }), - #{ <<"id">> := BridgeIDIngress , <<"type">> := <<"mqtt">> - , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDIngress, 5), %% we now test if the bridge works as expected - RemoteTopic = <<"remote_topic/1">>, LocalTopic = <<"local_topic/", RemoteTopic/binary>>, Payload = <<"hello">>, emqx:subscribe(LocalTopic), + timer:sleep(100), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. - wait_for_resource_ready(BridgeIDIngress, 5), emqx:publish(emqx_message:make(RemoteTopic, Payload)), %% we should receive a message on the local broker, with specified topic ?assert( @@ -295,22 +294,21 @@ t_mqtt_conn_bridge_egress(_) -> <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_EGRESS }), - #{ <<"id">> := BridgeIDEgress , <<"type">> := ?CONNECTR_TYPE , <<"name">> := ?BRIDGE_NAME_EGRESS - , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDEgress, 5), %% we now test if the bridge works as expected LocalTopic = <<"local_topic/1">>, RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, Payload = <<"hello">>, emqx:subscribe(RemoteTopic), + timer:sleep(100), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. - wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic @@ -369,10 +367,9 @@ t_mqtt_conn_update(_) -> #{ <<"id">> := BridgeIDEgress , <<"type">> := <<"mqtt">> , <<"name">> := ?BRIDGE_NAME_EGRESS - , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), - wait_for_resource_ready(BridgeIDEgress, 2), + wait_for_resource_ready(BridgeIDEgress, 5), %% then we try to update 'server' of the connector, to an unavailable IP address %% the update should fail because of 'unreachable' or 'connrefused' @@ -424,6 +421,7 @@ t_mqtt_conn_update2(_) -> {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + wait_for_resource_ready(BridgeIDEgress, 5), ?assertMatch(#{ <<"id">> := BridgeIDEgress , <<"status">> := <<"connected">> }, jsx:decode(BridgeStr)), @@ -454,7 +452,7 @@ t_mqtt_conn_update3(_) -> #{ <<"id">> := BridgeIDEgress , <<"connector">> := ConnctorID } = jsx:decode(Bridge), - wait_for_resource_ready(BridgeIDEgress, 2), + wait_for_resource_ready(BridgeIDEgress, 5), %% delete the connector should fail because it is in use by a bridge {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []), @@ -505,6 +503,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> LocalTopic = <<"local_topic/", RemoteTopic/binary>>, Payload = <<"hello">>, emqx:subscribe(LocalTopic), + timer:sleep(100), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. wait_for_resource_ready(BridgeIDIngress, 5), @@ -570,6 +569,7 @@ t_egress_mqtt_bridge_with_rules(_) -> RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, Payload = <<"hello">>, emqx:subscribe(RemoteTopic), + timer:sleep(100), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. wait_for_resource_ready(BridgeIDEgress, 5), @@ -593,6 +593,7 @@ t_egress_mqtt_bridge_with_rules(_) -> RuleTopic = <<"t/1">>, RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, emqx:subscribe(RemoteTopic2), + timer:sleep(100), wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(RuleTopic, Payload2)), {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index be8b4d074..1fbdb9eca 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -436,6 +436,7 @@ typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, examp typename_to_spec("bytesize()", _Mod) -> #{type => string, example => <<"32MB">>}; typename_to_spec("wordsize()", _Mod) -> #{type => string, example => <<"1024KB">>}; typename_to_spec("map()", _Mod) -> #{type => object, example => #{}}; +typename_to_spec("{binary(), binary()}", _Mod) -> #{type => object, example => #{}}; typename_to_spec("comma_separated_list()", _Mod) -> #{type => string, example => <<"item1,item2">>}; typename_to_spec("comma_separated_atoms()", _Mod) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index f56dec88a..1def6ebdb 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -70,7 +70,7 @@ -define(DEFAULT_IDLE_TIMEOUT, 30000). -define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}). -define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304, - message_queue_len => 32000}). + max_message_queue_len => 32000}). -elvis([{elvis_style, god_modules, disable}]). diff --git a/apps/emqx_modules/src/emqx_topic_metrics_api.erl b/apps/emqx_modules/src/emqx_topic_metrics_api.erl index 456ba473c..a59822795 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics_api.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics_api.erl @@ -235,7 +235,7 @@ accumulate_metrics(TopicMetricsIn, TopicMetricsAcc) -> %% @doc MetricsIn :: #{'messages.dropped.rate' :: integer(), ...} do_accumulation_metrics(MetricsIn, undefined) -> MetricsIn; -do_accumulation_metrics(MetricsIn, MetricsAcc) -> +do_accumulation_metrics(MetricsIn, {MetricsAcc, _}) -> Keys = maps:keys(MetricsIn), lists:foldl(fun(Key, Acc) -> InVal = maps:get(Key, MetricsIn), diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index ad3c1d420..29f66670b 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -22,9 +22,11 @@ %% be used by the prometheus application -behaviour(prometheus_collector). +-include("emqx_prometheus.hrl"). + -include_lib("prometheus/include/prometheus.hrl"). -include_lib("prometheus/include/prometheus_model.hrl"). - +-include_lib("emqx/include/logger.hrl"). -import(prometheus_model_helpers, [ create_mf/5 @@ -32,6 +34,16 @@ , counter_metric/1 ]). +-export([ update/1 + , start/0 + , stop/0 + , restart/0 + % for rpc + , do_start/0 + , do_stop/0 + , do_restart/0 + ]). + %% APIs -export([start_link/1]). @@ -58,6 +70,56 @@ -record(state, {push_gateway, timer, interval}). +%%-------------------------------------------------------------------- +%% update new config +update(Config) -> + case emqx_conf:update([prometheus], Config, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewConfigRows}} -> + case maps:get(<<"enable">>, Config, true) of + true -> + ok = restart(); + false -> + ok = stop() + end, + {ok, NewConfigRows}; + {error, Reason} -> + {error, Reason} + end. + +start() -> cluster_call(do_start, []). +stop() -> cluster_call(do_stop, []). +restart() -> cluster_call(do_restart, []). + +do_start() -> + emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus])). + +do_stop() -> + case emqx_prometheus_sup:stop_child(?APP) of + ok -> + ok; + {error, not_found} -> + ok + end. + +do_restart() -> + ok = do_stop(), + ok = do_start(), + ok. + +cluster_call(F, A) -> + [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()], + ok. + +rpc_call(N, F, A) -> + case rpc:call(N, ?MODULE, F, A, 5000) of + {badrpc, R} -> + ?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]), + {error, {badrpc, R}}; + Result -> + Result + end. + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index 84fb4262e..c129b1064 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -67,16 +67,13 @@ prometheus(get, _Params) -> {200, emqx:get_raw_config([<<"prometheus">>], #{})}; prometheus(put, #{body := Body}) -> - {ok, Config} = emqx:update_config([prometheus], Body), - case maps:get(<<"enable">>, Body) of - true -> - _ = emqx_prometheus_sup:stop_child(?APP), - emqx_prometheus_sup:start_child(?APP, maps:get(config, Config)); - false -> - _ = emqx_prometheus_sup:stop_child(?APP), - ok - end, - {200, emqx:get_raw_config([<<"prometheus">>], #{})}. + case emqx_prometheus:update(Body) of + {ok, NewConfig} -> + {200, NewConfig}; + {error, Reason} -> + Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), + {500, 'INTERNAL_ERROR', Message} + end. stats(get, #{headers := Headers}) -> Type = diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 6b7a571c5..adb8f7c71 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -37,7 +37,9 @@ , clean/0 , delete/1 , page_read/3 - , post_config_update/5]). + , post_config_update/5 + , stats_fun/0 + ]). %% gen_server callbacks -export([ init/1 @@ -69,6 +71,7 @@ -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback clear_expired(context()) -> ok. -callback clean(context()) -> ok. +-callback size(context()) -> non_neg_integer(). %%-------------------------------------------------------------------- %% Hook API @@ -185,6 +188,9 @@ post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) -> call(Req) -> gen_server:call(?MODULE, Req, infinity). +stats_fun() -> + gen_server:cast(?MODULE, ?FUNCTION_NAME). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -226,6 +232,12 @@ handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. +handle_cast(stats_fun, #{context := Context} = State) -> + Mod = get_backend_module(), + Size = Mod:size(Context), + emqx_stats:setstat('retained.count', 'retained.max', Size), + {noreply, State}; + handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. @@ -485,8 +497,11 @@ close_resource(_) -> load(Context) -> _ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Context]}), _ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Context]}), + emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0), ok. unload() -> emqx:unhook('message.publish', {?MODULE, on_message_publish}), - emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}). + emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}), + emqx_stats:cancel_update(emqx_retainer_stats), + ok. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 16b3ee137..2209c760f 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -30,7 +30,9 @@ , page_read/4 , match_messages/3 , clear_expired/1 - , clean/1]). + , clean/1 + , size/1 + ]). -export([create_resource/1]). @@ -73,7 +75,6 @@ store_retained(_, Msg =#message{topic = Topic}) -> ExpiryTime = emqx_retainer:get_expiry_time(Msg), case is_table_full() of false -> - ok = emqx_metrics:inc('messages.retained'), mria:dirty_write(?TAB, #retained{topic = topic2tokens(Topic), msg = Msg, @@ -155,6 +156,10 @@ match_messages(_, Topic, Cursor) -> clean(_) -> _ = mria:clear_table(?TAB), ok. + +size(_) -> + table_size(). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_slow_subs/etc/emqx_slow_subs.conf b/apps/emqx_slow_subs/etc/emqx_slow_subs.conf index a2ceb4cbc..9477b2e2c 100644 --- a/apps/emqx_slow_subs/etc/emqx_slow_subs.conf +++ b/apps/emqx_slow_subs/etc/emqx_slow_subs.conf @@ -12,8 +12,8 @@ emqx_slow_subs { ## The eviction time of the record, which in the statistics record table ## - ## Default: 5m - expire_interval = 5m + ## Default: 300ms + expire_interval = 300ms ## The maximum number of records in the slow subscription statistics record table ## diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 7b2ba85a3..198959766 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -25,6 +25,18 @@ -include("emqx_statsd.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-export([ update/1 + , start/0 + , stop/0 + , restart/0 + %% for rpc + , do_start/0 + , do_stop/0 + , do_restart/0 + ]). + %% Interface -export([start_link/1]). @@ -44,6 +56,52 @@ estatsd_pid :: pid() }). +update(Config) -> + case emqx_conf:update([statsd], + Config, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewConfigRows}} -> + ok = stop(), + case maps:get(<<"enable">>, Config, true) of + true -> + ok = start(); + false -> + ignore + end, + {ok, NewConfigRows}; + {error, Reason} -> + {error, Reason} + end. + + +start() -> cluster_call(do_start, []). +stop() -> cluster_call(do_stop, []). +restart() -> cluster_call(do_restart, []). + +do_start() -> + emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})). + +do_stop() -> + emqx_statsd_sup:ensure_child_stopped(?APP). + +do_restart() -> + ok = do_stop(), + ok = do_start(), + ok. + +cluster_call(F, A) -> + [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()], + ok. + +rpc_call(N, F, A) -> + case rpc:call(N, ?MODULE, F, A, 5000) of + {badrpc, R} -> + ?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]), + {error, {badrpc, R}}; + Result -> + Result + end. + start_link(Opts) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index 8bc0e608d..6fa657dcb 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -55,17 +55,8 @@ statsd(get, _Params) -> {200, emqx:get_raw_config([<<"statsd">>], #{})}; statsd(put, #{body := Body}) -> - case emqx:update_config([statsd], - Body, - #{rawconf_with_defaults => true, override_to => cluster}) of - {ok, #{raw_config := NewConfig, config := Config}} -> - ok = emqx_statsd_sup:ensure_child_stopped(?APP), - case maps:get(<<"enable">>, Body) of - true -> - ok = emqx_statsd_sup:ensure_child_started(?APP, maps:get(config, Config)); - false -> - ok - end, + case emqx_statsd:update(Body) of + {ok, NewConfig} -> {200, NewConfig}; {error, Reason} -> Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), diff --git a/rebar.config.erl b/rebar.config.erl index dc92d85ee..d0f123059 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -122,42 +122,42 @@ profiles() -> , {relx, relx(Vsn, cloud, bin, ce)} , {overrides, prod_overrides()} , {project_app_dirs, project_app_dirs(ce)} - , {post_hooks, [{compile, "./build emqx doc"}]} + , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx doc"}]} ]} , {'emqx-pkg', [ {erl_opts, prod_compile_opts()} , {relx, relx(Vsn, cloud, pkg, ce)} , {overrides, prod_overrides()} , {project_app_dirs, project_app_dirs(ce)} - , {post_hooks, [{compile, "./build emqx-pkg doc"}]} + , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-pkg doc"}]} ]} , {'emqx-enterprise', [ {erl_opts, prod_compile_opts()} , {relx, relx(Vsn, cloud, bin, ee)} , {overrides, prod_overrides()} , {project_app_dirs, project_app_dirs(ee)} - , {post_hooks, [{compile, "./build emqx-enterprise doc"}]} + , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-enterprise doc"}]} ]} , {'emqx-enterprise-pkg', [ {erl_opts, prod_compile_opts()} , {relx, relx(Vsn, cloud, pkg, ee)} , {overrides, prod_overrides()} , {project_app_dirs, project_app_dirs(ee)} - , {post_hooks, [{compile, "./build emqx-enterprise-pkg doc"}]} + , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-enterprise-pkg doc"}]} ]} , {'emqx-edge', [ {erl_opts, prod_compile_opts()} , {relx, relx(Vsn, edge, bin, ce)} , {overrides, prod_overrides()} , {project_app_dirs, project_app_dirs(ce)} - , {post_hooks, [{compile, "./build emqx-edge doc"}]} + , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-edge doc"}]} ]} , {'emqx-edge-pkg', [ {erl_opts, prod_compile_opts()} , {relx, relx(Vsn, edge, pkg, ce)} , {overrides, prod_overrides()} , {project_app_dirs, project_app_dirs(ce)} - , {post_hooks, [{compile, "./build emqx-edge-pkg doc"}]} + , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-edge-pkg doc"}]} ]} , {check, [ {erl_opts, common_compile_opts()}