From e2240bdf03806923333038755d0a4a4f5884d745 Mon Sep 17 00:00:00 2001 From: lafirest Date: Tue, 28 Dec 2021 10:13:55 +0800 Subject: [PATCH 01/29] feat(emqx_limiter): improve burst implementation --- .../src/emqx_limiter_correction.erl | 35 +++++++++++ .../emqx_limiter/src/emqx_limiter_decimal.erl | 18 +----- .../emqx_limiter/src/emqx_limiter_server.erl | 58 ++++++++++--------- 3 files changed, 68 insertions(+), 43 deletions(-) create mode 100644 apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl new file mode 100644 index 000000000..a92041b00 --- /dev/null +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_limiter_correction). + +%% API +-export([ add/2 ]). + +-type correction_value() :: #{ correction := emqx_limiter_decimal:zero_or_float() + , any() => any() + }. + +-export_type([correction_value/0]). + +%%-------------------------------------------------------------------- +%%% API +%%-------------------------------------------------------------------- +-spec add(number(), correction_value()) -> {integer(), correction_value()}. +add(Inc, #{correction := Correction} = Data) -> + FixedInc = Inc + Correction, + IntInc = erlang:floor(FixedInc), + {IntInc, Data#{correction := FixedInc - IntInc}}. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl index 28b6f3385..e1fdeedcc 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl @@ -20,7 +20,7 @@ %% API -export([ add/2, sub/2, mul/2 - , add_to_counter/3, put_to_counter/3, floor_div/2]). + , put_to_counter/3, floor_div/2]). -export_type([decimal/0, zero_or_float/0]). -type decimal() :: infinity | number(). @@ -60,22 +60,6 @@ floor_div(infinity, _) -> floor_div(A, B) -> erlang:floor(A / B). --spec add_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> - {zero_or_float(), zero_or_float()}. -add_to_counter(_, _, infinity) -> - {0, 0}; -add_to_counter(Counter, Index, Val) when is_float(Val) -> - IntPart = erlang:floor(Val), - if IntPart > 0 -> - counters:add(Counter, Index, IntPart); - true -> - ok - end, - {IntPart, Val - IntPart}; -add_to_counter(Counter, Index, Val) -> - counters:add(Counter, Index, Val), - {Val, 0}. - -spec put_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> ok. put_to_counter(_, _, infinity) -> ok; diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 799d623bf..ee2ed3431 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -87,7 +87,7 @@ -define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter -export_type([index/0]). --import(emqx_limiter_decimal, [add/2, sub/2, mul/2, add_to_counter/3, put_to_counter/3]). +-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). %%-------------------------------------------------------------------- %% API @@ -317,12 +317,11 @@ longitudinal(#{id := Id, longitudinal(#{id := Id, rate := Rate, capacity := Capacity, - correction := Correction, counter := Counter, index := Index, obtained := Obtained} = Node, InFlow, Nodes) when Counter =/= undefined -> - Flow = add(erlang:min(InFlow, Rate), Correction), + Flow = erlang:min(InFlow, Rate), ShouldAlloc = case counters:get(Counter, Index) of @@ -340,11 +339,11 @@ longitudinal(#{id := Id, Avaiable when Avaiable > 0 -> %% XXX if capacity is infinity, and flow always > 0, the value in counter %% will be overflow at some point in the future, do we need to deal with this situation??? - {Alloced, Decimal} = add_to_counter(Counter, Index, Avaiable), + {Inc, Node2} = emqx_limiter_correction:add(Avaiable, Node), + counters:add(Counter, Index, Inc), - {Alloced, - Nodes#{Id := Node#{obtained := Obtained + Alloced, - correction := Decimal}}}; + {Inc, + Nodes#{Id := Node2#{obtained := Obtained + Inc}}}; _ -> {0, Nodes} end; @@ -411,31 +410,38 @@ dispatch_burst([], State) -> dispatch_burst(GroupL, #{root := #{burst := Burst}, nodes := Nodes} = State) -> - InFlow = erlang:floor(Burst / erlang:length(GroupL)), + InFlow = Burst / erlang:length(GroupL), Dispatch = fun({Zone, Childs}, NodeAcc) -> - #{id := ZoneId, - burst := ZoneBurst, - obtained := Obtained} = Zone, + #{id := ZoneId, + burst := ZoneBurst, + obtained := Obtained} = Zone, - ZoneFlow = erlang:min(InFlow, ZoneBurst), - EachFlow = ZoneFlow div erlang:length(Childs), - Zone2 = Zone#{obtained := Obtained + ZoneFlow}, - NodeAcc2 = NodeAcc#{ZoneId := Zone2}, - dispatch_burst_to_buckets(Childs, EachFlow, NodeAcc2) + case erlang:min(InFlow, ZoneBurst) of + 0 -> NodeAcc; + ZoneFlow -> + EachFlow = ZoneFlow / erlang:length(Childs), + {Alloced, NodeAcc2} = dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc), + Zone2 = Zone#{obtained := Obtained + Alloced}, + NodeAcc2#{ZoneId := Zone2} + end end, State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}. -spec dispatch_burst_to_buckets(list(node_id()), - non_neg_integer(), nodes()) -> nodes(). -dispatch_burst_to_buckets(Childs, InFlow, Nodes) -> - Each = fun(ChildId, NodeAcc) -> - #{counter := Counter, - index := Index, - obtained := Obtained} = Bucket = maps:get(ChildId, NodeAcc), - counters:add(Counter, Index, InFlow), - NodeAcc#{ChildId := Bucket#{obtained := Obtained + InFlow}} - end, - lists:foldl(Each, Nodes, Childs). + float(), non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}. +dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) -> + #{counter := Counter, + index := Index, + obtained := Obtained} = Bucket = maps:get(ChildId, Nodes), + {Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket), + + counters:add(Counter, Index, Inc), + + Nodes2 = Nodes#{ChildId := Bucket2#{obtained := Obtained + Inc}}, + dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Nodes2); + +dispatch_burst_to_buckets([], _, Alloced, Nodes) -> + {Alloced, Nodes}. -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). init_tree(Type, State) -> From 6404efd0ecf8fcc13c7139cb7f1a2f331c1c1aa8 Mon Sep 17 00:00:00 2001 From: lafirest Date: Fri, 31 Dec 2021 15:59:47 +0800 Subject: [PATCH 02/29] fix(emqx_slow_subs): fix default expire value --- apps/emqx_slow_subs/etc/emqx_slow_subs.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_slow_subs/etc/emqx_slow_subs.conf b/apps/emqx_slow_subs/etc/emqx_slow_subs.conf index a2ceb4cbc..9477b2e2c 100644 --- a/apps/emqx_slow_subs/etc/emqx_slow_subs.conf +++ b/apps/emqx_slow_subs/etc/emqx_slow_subs.conf @@ -12,8 +12,8 @@ emqx_slow_subs { ## The eviction time of the record, which in the statistics record table ## - ## Default: 5m - expire_interval = 5m + ## Default: 300ms + expire_interval = 300ms ## The maximum number of records in the slow subscription statistics record table ## From 2c67ec44f450cfb62623cb5c1de7bb2db36e8eab Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 4 Jan 2022 11:11:41 +0800 Subject: [PATCH 03/29] fix(topic-metrics): fix bad map for calculating metrics --- apps/emqx_modules/src/emqx_topic_metrics_api.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_modules/src/emqx_topic_metrics_api.erl b/apps/emqx_modules/src/emqx_topic_metrics_api.erl index 1ba76579b..251cf4d81 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics_api.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics_api.erl @@ -235,7 +235,7 @@ accumulate_metrics(TopicMetricsIn, TopicMetricsAcc) -> %% @doc MetricsIn :: #{'messages.dropped.rate' :: integer(), ...} do_accumulation_metrics(MetricsIn, undefined) -> MetricsIn; -do_accumulation_metrics(MetricsIn, MetricsAcc) -> +do_accumulation_metrics(MetricsIn, {MetricsAcc, _}) -> Keys = maps:keys(MetricsIn), lists:foldl(fun(Key, Acc) -> InVal = maps:get(Key, MetricsIn), From 54ec3009eedff78da56ea2409e24418f70caa7d5 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 4 Jan 2022 10:00:32 +0800 Subject: [PATCH 04/29] chore(dashboard): update dashboard version to v0.16.0 --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index e034b5663..c32f1db73 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-2:23.3.4.9-3-alpine3 export EMQX_DEFAULT_RUNNER = alpine:3.14 export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) -export EMQX_DASHBOARD_VERSION ?= v0.14.0 +export EMQX_DASHBOARD_VERSION ?= v0.16.0 export DOCKERFILE := deploy/docker/Dockerfile export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing ifeq ($(OS),Windows_NT) From a593921137ef82accf0d28d2185187c7ae2baa60 Mon Sep 17 00:00:00 2001 From: lafirest Date: Tue, 4 Jan 2022 15:21:44 +0800 Subject: [PATCH 05/29] fix(emqx_retainer): fix the stats function of retainer --- apps/emqx/src/emqx_metrics.erl | 6 ++--- apps/emqx/test/emqx_metrics_SUITE.erl | 22 ++----------------- apps/emqx/test/emqx_proper_types.erl | 2 +- apps/emqx_retainer/src/emqx_retainer.erl | 19 ++++++++++++++-- .../src/emqx_retainer_mnesia.erl | 9 ++++++-- 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index 2757ea492..2c8088c48 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -146,7 +146,6 @@ {counter, 'messages.dropped.expired'}, % QoS2 Messages expired {counter, 'messages.dropped.no_subscribers'}, % Messages dropped {counter, 'messages.forward'}, % Messages forward - {counter, 'messages.retained'}, % Messages retained {counter, 'messages.delayed'}, % Messages delayed {counter, 'messages.delivered'}, % Messages delivered {counter, 'messages.acked'} % Messages acked @@ -207,7 +206,7 @@ stop() -> gen_server:stop(?SERVER). %% BACKW: v4.3.0 upgrade_retained_delayed_counter_type() -> - Ks = ['messages.retained', 'messages.delayed'], + Ks = ['messages.delayed'], gen_server:call(?SERVER, {set_type_to_counter, Ks}, infinity). %%-------------------------------------------------------------------- @@ -556,7 +555,7 @@ reserved_idx('messages.dropped') -> 109; reserved_idx('messages.dropped.expired') -> 110; reserved_idx('messages.dropped.no_subscribers') -> 111; reserved_idx('messages.forward') -> 112; -reserved_idx('messages.retained') -> 113; +%%reserved_idx('messages.retained') -> 113; %% keep the index, new metrics can use this reserved_idx('messages.delayed') -> 114; reserved_idx('messages.delivered') -> 115; reserved_idx('messages.acked') -> 116; @@ -592,4 +591,3 @@ reserved_idx('olp.gc') -> 303; reserved_idx('olp.new_conn') -> 304; reserved_idx(_) -> undefined. - diff --git a/apps/emqx/test/emqx_metrics_SUITE.erl b/apps/emqx/test/emqx_metrics_SUITE.erl index f47e028bc..8b277e898 100644 --- a/apps/emqx/test/emqx_metrics_SUITE.erl +++ b/apps/emqx/test/emqx_metrics_SUITE.erl @@ -71,19 +71,10 @@ t_inc_dec(_) -> with_metrics_server( fun() -> ?assertEqual(0, emqx_metrics:val('bytes.received')), - ?assertEqual(0, emqx_metrics:val('messages.retained')), ok = emqx_metrics:inc('bytes.received'), ok = emqx_metrics:inc('bytes.received', 2), ok = emqx_metrics:inc('bytes.received', 2), - ?assertEqual(5, emqx_metrics:val('bytes.received')), - ok = emqx_metrics:inc('messages.retained', 2), - ok = emqx_metrics:inc('messages.retained', 2), - ?assertEqual(4, emqx_metrics:val('messages.retained')), - ok = emqx_metrics:dec('messages.retained'), - ok = emqx_metrics:dec('messages.retained', 1), - ?assertEqual(2, emqx_metrics:val('messages.retained')), - ok = emqx_metrics:set('messages.retained', 3), - ?assertEqual(3, emqx_metrics:val('messages.retained')) + ?assertEqual(5, emqx_metrics:val('bytes.received')) end). t_inc_recv(_) -> @@ -162,21 +153,12 @@ t_trans(_) -> ok = emqx_metrics:trans(inc, 'bytes.received'), ok = emqx_metrics:trans(inc, 'bytes.received', 2), ?assertEqual(0, emqx_metrics:val('bytes.received')), - ok = emqx_metrics:trans(inc, 'messages.retained', 2), - ok = emqx_metrics:trans(inc, 'messages.retained', 2), - ?assertEqual(0, emqx_metrics:val('messages.retained')), ok = emqx_metrics:commit(), ?assertEqual(3, emqx_metrics:val('bytes.received')), - ?assertEqual(4, emqx_metrics:val('messages.retained')), - ok = emqx_metrics:trans(dec, 'messages.retained'), - ok = emqx_metrics:trans(dec, 'messages.retained', 1), - ?assertEqual(4, emqx_metrics:val('messages.retained')), - ok = emqx_metrics:commit(), - ?assertEqual(2, emqx_metrics:val('messages.retained')) + ok = emqx_metrics:commit() end). with_metrics_server(Fun) -> {ok, _} = emqx_metrics:start_link(), _ = Fun(), ok = emqx_metrics:stop(). - diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 32aba9674..fcae60294 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -288,7 +288,7 @@ systopic_metrics() -> <<"messages/qos2/received">>, <<"messages/qos2/sent">>, <<"messages/publish">>, <<"messages/dropped">>, <<"messages/dropped/expired">>, <<"messages/dropped/no_subscribers">>, - <<"messages/forward">>, <<"messages/retained">>, + <<"messages/forward">>, <<"messages/delayed">>, <<"messages/delivered">>, <<"messages/acked">>], ?LET({Nodename, T}, diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index e1780cc08..51a28897c 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -37,7 +37,9 @@ , clean/0 , delete/1 , page_read/3 - , post_config_update/5]). + , post_config_update/5 + , stats_fun/0 + ]). %% gen_server callbacks -export([ init/1 @@ -69,6 +71,7 @@ -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback clear_expired(context()) -> ok. -callback clean(context()) -> ok. +-callback size(context()) -> non_neg_integer(). %%-------------------------------------------------------------------- %% Hook API @@ -185,6 +188,9 @@ post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) -> call(Req) -> gen_server:call(?MODULE, Req, infinity). +stats_fun() -> + gen_server:cast(?MODULE, ?FUNCTION_NAME). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -226,6 +232,12 @@ handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. +handle_cast(stats_fun, #{context := Context} = State) -> + Mod = get_backend_module(), + Size = Mod:size(Context), + emqx_stats:setstat('retained.count', 'retained.max', Size), + {noreply, State}; + handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. @@ -485,8 +497,11 @@ close_resource(_) -> load(Context) -> _ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Context]}), _ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Context]}), + emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0), ok. unload() -> emqx:unhook('message.publish', {?MODULE, on_message_publish}), - emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}). + emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}), + emqx_stats:cancel_update(emqx_retainer_stats), + ok. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index e5e347fdc..1472acbc2 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -30,7 +30,9 @@ , page_read/4 , match_messages/3 , clear_expired/1 - , clean/1]). + , clean/1 + , size/1 + ]). -export([create_resource/1]). @@ -75,7 +77,6 @@ store_retained(_, Msg =#message{topic = Topic}) -> ExpiryTime = emqx_retainer:get_expiry_time(Msg), case is_table_full() of false -> - ok = emqx_metrics:inc('messages.retained'), mria:dirty_write(?TAB, #retained{topic = topic2tokens(Topic), msg = Msg, @@ -158,6 +159,10 @@ match_messages(_, Topic, Cursor) -> clean(_) -> _ = mria:clear_table(?TAB), ok. + +size(_) -> + table_size(). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- From ab37c48860982d9ca0860b25e1f5cde160a832b3 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 30 Dec 2021 20:45:26 +0800 Subject: [PATCH 06/29] fix(authz): authz http resource url query string --- apps/emqx_authz/src/emqx_authz_http.erl | 156 ++++++++++++++-------- apps/emqx_authz/src/emqx_authz_schema.erl | 12 +- 2 files changed, 106 insertions(+), 62 deletions(-) diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl index fe48b35d0..3721aef28 100644 --- a/apps/emqx_authz/src/emqx_authz_http.erl +++ b/apps/emqx_authz/src/emqx_authz_http.erl @@ -40,43 +40,28 @@ description() -> "AuthZ with http". -init(#{url := Url} = Source) -> - NSource = maps:put(base_url, maps:remove(query, Url), Source), - case emqx_authz_utils:create_resource(emqx_connector_http, NSource) of +init(Config) -> + NConfig = parse_config(Config), + case emqx_authz_utils:create_resource(emqx_connector_http, NConfig) of {error, Reason} -> error({load_config_error, Reason}); - {ok, Id} -> Source#{annotations => #{id => Id}} + {ok, Id} -> NConfig#{annotations => #{id => Id}} end. destroy(#{annotations := #{id := Id}}) -> ok = emqx_resource:remove_local(Id). -dry_run(Source) -> - URIMap = maps:get(url, Source), - NSource = maps:put(base_url, maps:remove(query, URIMap), Source), - emqx_resource:create_dry_run_local(emqx_connector_http, NSource). +dry_run(Config) -> + emqx_resource:create_dry_run_local(emqx_connector_http, parse_config(Config)). -authorize(Client, PubSub, Topic, - #{type := http, - url := #{path := Path} = URL, - headers := Headers, - method := Method, - request_timeout := RequestTimeout, - annotations := #{id := ResourceID} - } = Source) -> - Request = case Method of - get -> - Query = maps:get(query, URL, ""), - Path1 = replvar(Path ++ "?" ++ Query, PubSub, Topic, Client), - {Path1, maps:to_list(Headers)}; - _ -> - Body0 = serialize_body( - maps:get('Accept', Headers, <<"application/json">>), - maps:get(body, Source, #{}) - ), - Body1 = replvar(Body0, PubSub, Topic, Client), - Path1 = replvar(Path, PubSub, Topic, Client), - {Path1, maps:to_list(Headers), Body1} - end, +authorize( Client + , PubSub + , Topic + , #{ type := http + , annotations := #{id := ResourceID} + , method := Method + , request_timeout := RequestTimeout + } = Config) -> + Request = generate_request(PubSub, Topic, Client, Config), case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of {ok, 200, _Headers} -> {matched, allow}; @@ -84,6 +69,8 @@ authorize(Client, PubSub, Topic, {matched, allow}; {ok, 200, _Headers, _Body} -> {matched, allow}; + {ok, _Status, _Headers} -> + nomatch; {ok, _Status, _Headers, _Body} -> nomatch; {error, Reason} -> @@ -93,6 +80,24 @@ authorize(Client, PubSub, Topic, ignore end. +parse_config(#{ url := URL + , method := Method + , headers := Headers + , request_timeout := ReqTimeout + } = Conf) -> + {BaseURLWithPath, Query} = parse_fullpath(URL), + BaseURLMap = parse_url(BaseURLWithPath), + Conf#{ method => Method + , base_url => maps:remove(query, BaseURLMap) + , base_query => cow_qs:parse_qs(bin(Query)) + , body => maps:get(body, Conf, #{}) + , headers => Headers + , request_timeout => ReqTimeout + }. + +parse_fullpath(RawURL) -> + cow_http:parse_fullpath(bin(RawURL)). + parse_url(URL) when URL =:= undefined -> #{}; @@ -105,12 +110,45 @@ parse_url(URL) -> URIMap end. +generate_request( PubSub + , Topic + , Client + , #{ method := Method + , base_url := #{path := Path} + , base_query := BaseQuery + , headers := Headers + , body := Body0 + }) -> + Body = replace_placeholders(maps:to_list(Body0), PubSub, Topic, Client), + NBaseQuery = replace_placeholders(BaseQuery, PubSub, Topic, Client), + case Method of + get -> + NPath = append_query(Path, NBaseQuery ++ Body), + {NPath, maps:to_list(Headers)}; + _ -> + NPath = append_query(Path, NBaseQuery), + NBody = serialize_body( + maps:get(<<"Accept">>, Headers, <<"application/json">>), + Body + ), + {NPath, maps:to_list(Headers), NBody} + end. + +append_query(Path, []) -> + Path; +append_query(Path, Query) -> + Path ++ "?" ++ binary_to_list(query_string(Query)). + query_string(Body) -> - query_string(maps:to_list(Body), []). + query_string(Body, []). query_string([], Acc) -> - <<$&, Str/binary>> = iolist_to_binary(lists:reverse(Acc)), - Str; + case iolist_to_binary(lists:reverse(Acc)) of + <<$&, Str/binary>> -> + Str; + <<>> -> + <<>> + end; query_string([{K, V} | More], Acc) -> query_string( More , [ ["&", emqx_http_lib:uri_encode(K), "=", emqx_http_lib:uri_encode(V)] @@ -121,30 +159,34 @@ serialize_body(<<"application/json">>, Body) -> serialize_body(<<"application/x-www-form-urlencoded">>, Body) -> query_string(Body). -replvar(Str0, PubSub, Topic, - #{username := Username, - clientid := Clientid, - peerhost := IpAddress, - protocol := Protocol, - mountpoint := Mountpoint - }) when is_list(Str0); - is_binary(Str0) -> - NTopic = emqx_http_lib:uri_encode(Topic), - Str1 = re:replace( Str0, emqx_authz:ph_to_re(?PH_S_CLIENTID) - , bin(Clientid), [global, {return, binary}]), - Str2 = re:replace( Str1, emqx_authz:ph_to_re(?PH_S_USERNAME) - , bin(Username), [global, {return, binary}]), - Str3 = re:replace( Str2, emqx_authz:ph_to_re(?PH_S_HOST) - , inet_parse:ntoa(IpAddress), [global, {return, binary}]), - Str4 = re:replace( Str3, emqx_authz:ph_to_re(?PH_S_PROTONAME) - , bin(Protocol), [global, {return, binary}]), - Str5 = re:replace( Str4, emqx_authz:ph_to_re(?PH_S_MOUNTPOINT) - , bin(Mountpoint), [global, {return, binary}]), - Str6 = re:replace( Str5, emqx_authz:ph_to_re(?PH_S_TOPIC) - , bin(NTopic), [global, {return, binary}]), - Str7 = re:replace( Str6, emqx_authz:ph_to_re(?PH_S_ACTION) - , bin(PubSub), [global, {return, binary}]), - Str7. +replace_placeholders(KVs, PubSub, Topic, Client) -> + replace_placeholders(KVs, PubSub, Topic, Client, []). + +replace_placeholders([], _PubSub, _Topic, _Client, Acc) -> + lists:reverse(Acc); +replace_placeholders([{K, V0} | More], PubSub, Topic, Client, Acc) -> + case replace_placeholder(V0, PubSub, Topic, Client) of + undefined -> + error({cannot_get_variable, V0}); + V -> + replace_placeholders(More, PubSub, Topic, Client, [{bin(K), bin(V)} | Acc]) + end. + +replace_placeholder(?PH_USERNAME, _PubSub, _Topic, Client) -> + bin(maps:get(username, Client, undefined)); +replace_placeholder(?PH_CLIENTID, _PubSub, _Topic, Client) -> + bin(maps:get(clientid, Client, undefined)); +replace_placeholder(?PH_HOST, _PubSub, _Topic, Client) -> + inet_parse:ntoa(maps:get(peerhost, Client, undefined)); +replace_placeholder(?PH_PROTONAME, _PubSub, _Topic, Client) -> + bin(maps:get(protocol, Client, undefined)); +replace_placeholder(?PH_TOPIC, _PubSub, Topic, _Client) -> + bin(emqx_http_lib:uri_encode(Topic)); +replace_placeholder(?PH_ACTION, PubSub, _Topic, _Client) -> + bin(PubSub); + +replace_placeholder(Constant, _, _, _) -> + Constant. bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(B) when is_binary(B) -> B; diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index 4f7788849..752d656ba 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -17,17 +17,14 @@ -module(emqx_authz_schema). -include_lib("typerefl/include/types.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). -reflect_type([ permission/0 , action/0 - , url/0 ]). --typerefl_from_string({url/0, emqx_http_lib, uri_parse}). - -type action() :: publish | subscribe | all. -type permission() :: allow | deny. --type url() :: emqx_http_lib:uri_map(). -export([ namespace/0 , roots/0 @@ -143,7 +140,7 @@ fields(redis_cluster) -> http_common_fields() -> [ {type, #{type => http}} , {enable, #{type => boolean(), default => true}} - , {url, #{type => url()}} + , {url, fun url/1} , {request_timeout, mk_duration("request timeout", #{default => "30s"})} , {body, #{type => map(), nullable => true}} ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)). @@ -177,6 +174,11 @@ headers_no_content_type(converter) -> headers_no_content_type(default) -> default_headers_no_content_type(); headers_no_content_type(_) -> undefined. +url(type) -> binary(); +url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")]; +url(nullable) -> false; +url(_) -> undefined. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- From 6affb5aca1993249ad6917bcbb0eaaa970f5ed6e Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 30 Dec 2021 22:19:35 +0800 Subject: [PATCH 07/29] fix(authn): authn http resource url query string --- .../src/simple_authn/emqx_authn_http.erl | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index 533301ea7..3aac29b41 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -19,6 +19,7 @@ -include("emqx_authn.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("typerefl/include/types.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). -behaviour(hocon_schema). -behaviour(emqx_authentication). @@ -77,7 +78,7 @@ validations() -> ]. url(type) -> binary(); -url(validator) -> [fun check_url/1]; +url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")]; url(nullable) -> false; url(_) -> undefined. @@ -118,16 +119,16 @@ create(_AuthenticatorID, Config) -> create(Config). create(#{method := Method, - url := URL, + url := RawURL, headers := Headers, body := Body, request_timeout := RequestTimeout} = Config) -> - #{path := Path, - query := Query} = URIMap = parse_url(URL), + {BsaeUrlWithPath, Query} = parse_fullpath(RawURL), + URIMap = parse_url(BsaeUrlWithPath), ResourceId = emqx_authn_utils:make_resource_id(?MODULE), State = #{method => Method, - path => Path, - base_query => cow_qs:parse_qs(list_to_binary(Query)), + path => maps:get(path, URIMap), + base_query => cow_qs:parse_qs(to_bin(Query)), headers => maps:to_list(Headers), body => maps:to_list(Body), request_timeout => RequestTimeout, @@ -204,11 +205,8 @@ destroy(#{resource_id := ResourceId}) -> %% Internal functions %%-------------------------------------------------------------------- -check_url(URL) -> - case emqx_http_lib:uri_parse(URL) of - {ok, _} -> true; - {error, _} -> false - end. +parse_fullpath(RawURL) -> + cow_http:parse_fullpath(to_bin(RawURL)). check_body(Body) -> lists:all( @@ -234,7 +232,8 @@ transform_header_name(Headers) -> end, #{}, Headers). check_ssl_opts(Conf) -> - case parse_url(get_conf_val("url", Conf)) of + {BaseUrlWithPath, _Query} = parse_fullpath(get_conf_val("url", Conf)), + case parse_url(BaseUrlWithPath) of #{scheme := https} -> case get_conf_val("ssl.enable", Conf) of true -> ok; @@ -264,12 +263,13 @@ generate_request(Credential, #{method := Method, headers := Headers, body := Body0}) -> Body = replace_placeholders(Body0, Credential), + NBaseQuery = replace_placeholders(BaseQuery, Credential), case Method of get -> - NPath = append_query(Path, BaseQuery ++ Body), + NPath = append_query(Path, NBaseQuery ++ Body), {NPath, Headers}; post -> - NPath = append_query(Path, BaseQuery), + NPath = append_query(Path, NBaseQuery), ContentType = proplists:get_value(<<"content-type">>, Headers), NBody = serialize_body(ContentType, Body), {NPath, Headers, NBody} From fa25991c5cf0e30e55031d47b15eda284a86cf65 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 30 Dec 2021 23:34:48 +0800 Subject: [PATCH 08/29] test(authz): authnz acl query string use placehodler --- apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl index a448fa25f..c6ad0f098 100644 --- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl @@ -29,7 +29,7 @@ -define(SOURCE1, #{<<"type">> => <<"http">>, <<"enable">> => true, - <<"url">> => <<"https://fake.com:443/">>, + <<"url">> => <<"https://fake.com:443/acl?username=", ?PH_USERNAME/binary>>, <<"headers">> => #{}, <<"method">> => <<"get">>, <<"request_timeout">> => <<"5s">> From 7bc59969eb9c0f8245d3e4a3a48feb415ef88986 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 4 Jan 2022 11:01:14 +0800 Subject: [PATCH 09/29] fix(prometheus): update config in cluster --- .../src/emqx_prometheus_api.erl | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index 9e8904a4e..f849589f4 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -67,16 +67,22 @@ prometheus(get, _Params) -> {200, emqx:get_raw_config([<<"prometheus">>], #{})}; prometheus(put, #{body := Body}) -> - {ok, Config} = emqx:update_config([prometheus], Body), - case maps:get(<<"enable">>, Body) of - true -> - _ = emqx_prometheus_sup:stop_child(?APP), - emqx_prometheus_sup:start_child(?APP, maps:get(config, Config)); - false -> - _ = emqx_prometheus_sup:stop_child(?APP), - ok - end, - {200, emqx:get_raw_config([<<"prometheus">>], #{})}. + case emqx:update_config([prometheus], + Body, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewConfig, config := Config}} -> + case maps:get(<<"enable">>, Body) of + true -> + _ = emqx_prometheus_sup:stop_child(?APP), + emqx_prometheus_sup:start_child(?APP, Config); + false -> + _ = emqx_prometheus_sup:stop_child(?APP) + end, + {200, NewConfig}; + {error, Reason} -> + Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), + {500, 'INTERNAL_ERROR', Message} + end. stats(get, #{headers := Headers}) -> Type = From e908973a867d84068aca92cd2e7abdd45cf1fba3 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 4 Jan 2022 11:07:58 +0800 Subject: [PATCH 10/29] fix(statsd): update config bug --- apps/emqx_statsd/src/emqx_statsd_api.erl | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index ab9416266..c7ac94003 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -61,10 +61,8 @@ statsd(put, #{body := Body}) -> {ok, #{raw_config := NewConfig, config := Config}} -> ok = emqx_statsd_sup:ensure_child_stopped(?APP), case maps:get(<<"enable">>, Body) of - true -> - ok = emqx_statsd_sup:ensure_child_started(?APP, maps:get(config, Config)); - false -> - ok + true -> emqx_statsd_sup:ensure_child_started(?APP, Config); + false -> ok end, {200, NewConfig}; {error, Reason} -> From 20c5343f9bd383c4f0808e093c68058fc353fe47 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 4 Jan 2022 13:57:23 +0800 Subject: [PATCH 11/29] fix(prometheus): dialyzer --- apps/emqx_prometheus/src/emqx_prometheus_api.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index f849589f4..dda8bbd13 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -71,12 +71,12 @@ prometheus(put, #{body := Body}) -> Body, #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewConfig, config := Config}} -> - case maps:get(<<"enable">>, Body) of + case maps:get(<<"enable">>, Body, true) of true -> - _ = emqx_prometheus_sup:stop_child(?APP), - emqx_prometheus_sup:start_child(?APP, Config); + ok = emqx_prometheus_sup:stop_child(?APP), + ok = emqx_prometheus_sup:start_child(?APP, Config); false -> - _ = emqx_prometheus_sup:stop_child(?APP) + ok = emqx_prometheus_sup:stop_child(?APP) end, {200, NewConfig}; {error, Reason} -> From eabede04f3ab29771f29ba48377844fd28e1dc21 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 4 Jan 2022 15:00:50 +0800 Subject: [PATCH 12/29] fix(test): emqx_connector_api_SUITE add sleep --- apps/emqx_connector/test/emqx_connector_api_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 12a3a8e23..4ecc61cec 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -221,6 +221,7 @@ t_mqtt_conn_bridge_ingress(_) -> %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now + timer:sleep(50), {ok, 201, Bridge} = request(post, uri(["bridges"]), ?MQTT_BRIDGE_INGRESS(ConnctorID)#{ <<"type">> => ?CONNECTR_TYPE, From 21bf07a01cbc72f7bf29e166e3eee508a8f7b818 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 5 Jan 2022 10:38:54 +0800 Subject: [PATCH 13/29] fix(test): race conditions in connector_api_SUITE --- .../test/emqx_connector_api_SUITE.erl | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 12a3a8e23..ef4de9bbf 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -226,22 +226,20 @@ t_mqtt_conn_bridge_ingress(_) -> <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_INGRESS }), - #{ <<"id">> := BridgeIDIngress , <<"type">> := <<"mqtt">> - , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDIngress, 5), %% we now test if the bridge works as expected - RemoteTopic = <<"remote_topic/1">>, LocalTopic = <<"local_topic/", RemoteTopic/binary>>, Payload = <<"hello">>, emqx:subscribe(LocalTopic), + timer:sleep(100), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. - wait_for_resource_ready(BridgeIDIngress, 5), emqx:publish(emqx_message:make(RemoteTopic, Payload)), %% we should receive a message on the local broker, with specified topic ?assert( @@ -295,22 +293,21 @@ t_mqtt_conn_bridge_egress(_) -> <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_EGRESS }), - #{ <<"id">> := BridgeIDEgress , <<"type">> := ?CONNECTR_TYPE , <<"name">> := ?BRIDGE_NAME_EGRESS - , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDEgress, 5), %% we now test if the bridge works as expected LocalTopic = <<"local_topic/1">>, RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, Payload = <<"hello">>, emqx:subscribe(RemoteTopic), + timer:sleep(100), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. - wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic @@ -369,10 +366,9 @@ t_mqtt_conn_update(_) -> #{ <<"id">> := BridgeIDEgress , <<"type">> := <<"mqtt">> , <<"name">> := ?BRIDGE_NAME_EGRESS - , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), - wait_for_resource_ready(BridgeIDEgress, 2), + wait_for_resource_ready(BridgeIDEgress, 5), %% then we try to update 'server' of the connector, to an unavailable IP address %% the update should fail because of 'unreachable' or 'connrefused' @@ -424,6 +420,7 @@ t_mqtt_conn_update2(_) -> {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + wait_for_resource_ready(BridgeIDEgress, 5), ?assertMatch(#{ <<"id">> := BridgeIDEgress , <<"status">> := <<"connected">> }, jsx:decode(BridgeStr)), @@ -454,7 +451,7 @@ t_mqtt_conn_update3(_) -> #{ <<"id">> := BridgeIDEgress , <<"connector">> := ConnctorID } = jsx:decode(Bridge), - wait_for_resource_ready(BridgeIDEgress, 2), + wait_for_resource_ready(BridgeIDEgress, 5), %% delete the connector should fail because it is in use by a bridge {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []), @@ -505,6 +502,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> LocalTopic = <<"local_topic/", RemoteTopic/binary>>, Payload = <<"hello">>, emqx:subscribe(LocalTopic), + timer:sleep(100), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. wait_for_resource_ready(BridgeIDIngress, 5), @@ -570,6 +568,7 @@ t_egress_mqtt_bridge_with_rules(_) -> RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, Payload = <<"hello">>, emqx:subscribe(RemoteTopic), + timer:sleep(100), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. wait_for_resource_ready(BridgeIDEgress, 5), @@ -593,6 +592,7 @@ t_egress_mqtt_bridge_with_rules(_) -> RuleTopic = <<"t/1">>, RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, emqx:subscribe(RemoteTopic2), + timer:sleep(100), wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(RuleTopic, Payload2)), {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), From 2c1f4d4860d24e833f076074075d051e60227b92 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 5 Jan 2022 12:52:26 +0800 Subject: [PATCH 14/29] chore(dashboard): update dashboard version to v0.17.0 --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index c32f1db73..7059586cd 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-2:23.3.4.9-3-alpine3 export EMQX_DEFAULT_RUNNER = alpine:3.14 export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) -export EMQX_DASHBOARD_VERSION ?= v0.16.0 +export EMQX_DASHBOARD_VERSION ?= v0.17.0 export DOCKERFILE := deploy/docker/Dockerfile export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing ifeq ($(OS),Windows_NT) From 7ae3c9389204ce1adfd806f14b55ff81123d9248 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 5 Jan 2022 14:37:25 +0800 Subject: [PATCH 15/29] chore(test): update api test script version --- .github/workflows/run_api_tests.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run_api_tests.yaml b/.github/workflows/run_api_tests.yaml index 796bcf451..084704537 100644 --- a/.github/workflows/run_api_tests.yaml +++ b/.github/workflows/run_api_tests.yaml @@ -61,7 +61,7 @@ jobs: - uses: actions/checkout@v2 with: repository: emqx/emqx-fvt - ref: 1.0.3-dev1 + ref: 1.0.3-dev2 path: . - uses: actions/setup-java@v1 with: From 72713cb85fc692bc5b6284c0b7c8eef483b669c4 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 5 Jan 2022 15:05:53 +0800 Subject: [PATCH 16/29] fix(config): restart app after config update in cluster --- apps/emqx_prometheus/src/emqx_prometheus.erl | 61 ++++++++++++++++++- .../src/emqx_prometheus_api.erl | 13 +--- apps/emqx_statsd/src/emqx_statsd.erl | 58 ++++++++++++++++++ apps/emqx_statsd/src/emqx_statsd_api.erl | 11 +--- 4 files changed, 122 insertions(+), 21 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 109564929..bd15d3c87 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -22,9 +22,11 @@ %% be used by the prometheus application -behaviour(prometheus_collector). +-include("emqx_prometheus.hrl"). + -include_lib("prometheus/include/prometheus.hrl"). -include_lib("prometheus/include/prometheus_model.hrl"). - +-include_lib("emqx/include/logger.hrl"). -import(prometheus_model_helpers, [ create_mf/5 @@ -32,6 +34,16 @@ , counter_metric/1 ]). +-export([ update/1 + , start/0 + , stop/0 + , restart/0 + % for rpc + , do_start/0 + , do_stop/0 + , do_restart/0 + ]). + %% APIs -export([start_link/1]). @@ -58,6 +70,53 @@ -record(state, {push_gateway, timer, interval}). +%%-------------------------------------------------------------------- +%% update new config +update(Config) -> + case emqx:update_config([prometheus], Config, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewConfigRows}} -> + case maps:get(<<"enable">>, Config, true) of + true -> + ok = restart(); + false -> + ok = stop() + end, + {ok, NewConfigRows}; + {error, Reason} -> + {error, Reason} + end. + +start() -> cluster_call(do_start, []). +stop() -> cluster_call(do_stop, []). +restart() -> cluster_call(do_restart, []). + +do_start() -> + emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus])). + +do_stop() -> + emqx_prometheus_sup:stop_child(?APP). + +do_restart() -> + case {stop(), start()} of + {ok, ok} -> + ok; + {Error1, Error2} -> + ?LOG(error, "~p restart failed stop: ~p start: ~p", [?MODULE, Error1, Error2]) + end. + +cluster_call(F, A) -> + [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()]. + +rpc_call(N, F, A) -> + case rpc:call(N, ?MODULE, F, A, 5000) of + {badrpc, R} -> + ?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]), + {error, {badrpc, R}}; + Result -> + Result + end. + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index dda8bbd13..52120971b 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -67,17 +67,8 @@ prometheus(get, _Params) -> {200, emqx:get_raw_config([<<"prometheus">>], #{})}; prometheus(put, #{body := Body}) -> - case emqx:update_config([prometheus], - Body, - #{rawconf_with_defaults => true, override_to => cluster}) of - {ok, #{raw_config := NewConfig, config := Config}} -> - case maps:get(<<"enable">>, Body, true) of - true -> - ok = emqx_prometheus_sup:stop_child(?APP), - ok = emqx_prometheus_sup:start_child(?APP, Config); - false -> - ok = emqx_prometheus_sup:stop_child(?APP) - end, + case emqx_prometheus:update(Body) of + {ok, NewConfig} -> {200, NewConfig}; {error, Reason} -> Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 5f88e5bdd..4b7852997 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -25,6 +25,18 @@ -include("emqx_statsd.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-export([ update/1 + , start/0 + , stop/0 + , restart/0 + %% for rpc + , do_start/0 + , do_stop/0 + , do_restart/0 + ]). + %% Interface -export([start_link/1]). @@ -44,6 +56,52 @@ estatsd_pid :: pid() }). +update(Config) -> + case emqx:update_config([statsd], + Config, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewConfigRows}} -> + start(), + case maps:get(<<"enable">>, Config) of + true -> stop(); + false -> ok + end, + {ok, NewConfigRows}; + {error, Reason} -> + {error, Reason} + end. + + +start() -> cluster_call(do_start, []). +stop() -> cluster_call(do_stop, []). +restart() -> cluster_call(do_restart, []). + +do_start() -> + emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})). + +do_stop() -> + emqx_statsd_sup:ensure_child_stopped(?APP). + +do_restart() -> + case {stop(), start()} of + {ok, ok} -> + ok; + {Error1, Error2} -> + ?LOG(error, "~p restart failed stop: ~p start: ~p", [?MODULE, Error1, Error2]) + end. + +cluster_call(F, A) -> + [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()]. + +rpc_call(N, F, A) -> + case rpc:call(N, ?MODULE, F, A, 5000) of + {badrpc, R} -> + ?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]), + {error, {badrpc, R}}; + Result -> + Result + end. + start_link(Opts) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index c7ac94003..ad28bae95 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -55,15 +55,8 @@ statsd(get, _Params) -> {200, emqx:get_raw_config([<<"statsd">>], #{})}; statsd(put, #{body := Body}) -> - case emqx:update_config([statsd], - Body, - #{rawconf_with_defaults => true, override_to => cluster}) of - {ok, #{raw_config := NewConfig, config := Config}} -> - ok = emqx_statsd_sup:ensure_child_stopped(?APP), - case maps:get(<<"enable">>, Body) of - true -> emqx_statsd_sup:ensure_child_started(?APP, Config); - false -> ok - end, + case emqx_statsd:update(Body) of + {ok, NewConfig} -> {200, NewConfig}; {error, Reason} -> Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), From 303707d69d16d3f4847137287a0118a17dd7c182 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 5 Jan 2022 14:17:59 +0800 Subject: [PATCH 17/29] fix: make sure authz headers is binary. --- apps/emqx_authz/src/emqx_authz_http.erl | 4 +-- apps/emqx_authz/src/emqx_authz_schema.erl | 15 +++++----- .../src/emqx_connector_http.erl | 29 +++++++++++++------ .../src/emqx_dashboard_swagger.erl | 1 + 4 files changed, 31 insertions(+), 18 deletions(-) diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl index 3721aef28..5ee0a6d94 100644 --- a/apps/emqx_authz/src/emqx_authz_http.erl +++ b/apps/emqx_authz/src/emqx_authz_http.erl @@ -124,14 +124,14 @@ generate_request( PubSub case Method of get -> NPath = append_query(Path, NBaseQuery ++ Body), - {NPath, maps:to_list(Headers)}; + {NPath, Headers}; _ -> NPath = append_query(Path, NBaseQuery), NBody = serialize_body( maps:get(<<"Accept">>, Headers, <<"application/json">>), Body ), - {NPath, maps:to_list(Headers), NBody} + {NPath, Headers, NBody} end. append_query(Path, []) -> diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index 752d656ba..d68d97cf3 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -33,6 +33,7 @@ ]). -import(emqx_schema, [mk_duration/2]). +-include_lib("hocon/include/hoconsc.hrl"). %%-------------------------------------------------------------------- %% Hocon Schema @@ -158,20 +159,20 @@ validations() -> , {check_headers, fun check_headers/1} ]. -headers(type) -> map(); +headers(type) -> list({binary(), binary()}); headers(converter) -> fun(Headers) -> - maps:merge(default_headers(), transform_header_name(Headers)) + maps:to_list(maps:merge(default_headers(), transform_header_name(Headers))) end; -headers(default) -> default_headers(); +headers(default) -> maps:to_list(default_headers()); headers(_) -> undefined. -headers_no_content_type(type) -> map(); +headers_no_content_type(type) -> list({binary(), binary()}); headers_no_content_type(converter) -> fun(Headers) -> - maps:merge(default_headers_no_content_type(), transform_header_name(Headers)) + maps:to_list(maps:merge(default_headers_no_content_type(), transform_header_name(Headers))) end; -headers_no_content_type(default) -> default_headers_no_content_type(); +headers_no_content_type(default) -> maps:to_list(default_headers_no_content_type()); headers_no_content_type(_) -> undefined. url(type) -> binary(); @@ -221,7 +222,7 @@ check_headers(Conf) check_headers(Conf) -> Method = to_bin(hocon_schema:get_value("config.method", Conf)), Headers = hocon_schema:get_value("config.headers", Conf), - Method =:= <<"post">> orelse (not maps:is_key(<<"content-type">>, Headers)). + Method =:= <<"post">> orelse (not lists:member(<<"content-type">>, Headers)). union_array(Item) when is_list(Item) -> hoconsc:array(hoconsc:union(Item)). diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 8b366070d..42bfe85b1 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -262,11 +262,20 @@ preprocess_request(#{ , request_timeout => maps:get(request_timeout, Req, 30000) }. -preproc_headers(Headers) -> +preproc_headers(Headers) when is_map(Headers) -> maps:fold(fun(K, V, Acc) -> - Acc#{emqx_plugin_libs_rule:preproc_tmpl(bin(K)) => - emqx_plugin_libs_rule:preproc_tmpl(bin(V))} - end, #{}, Headers). + [{ + emqx_plugin_libs_rule:preproc_tmpl(bin(K)), + emqx_plugin_libs_rule:preproc_tmpl(bin(V)) + } | Acc] + end, [], Headers); +preproc_headers(Headers) when is_list(Headers) -> + lists:map(fun({K, V}) -> + { + emqx_plugin_libs_rule:preproc_tmpl(bin(K)), + emqx_plugin_libs_rule:preproc_tmpl(bin(V)) + } + end, Headers). process_request(#{ method := MethodTks, @@ -278,7 +287,7 @@ process_request(#{ Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)) , path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg) , body => process_request_body(BodyTks, Msg) - , headers => maps:to_list(proc_headers(HeadersTks, Msg)) + , headers => proc_headers(HeadersTks, Msg) , request_timeout => ReqTimeout }. @@ -288,10 +297,12 @@ process_request_body(BodyTks, Msg) -> emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg). proc_headers(HeaderTks, Msg) -> - maps:fold(fun(K, V, Acc) -> - Acc#{emqx_plugin_libs_rule:proc_tmpl(K, Msg) => - emqx_plugin_libs_rule:proc_tmpl(V, Msg)} - end, #{}, HeaderTks). + lists:map(fun({K, V}) -> + { + emqx_plugin_libs_rule:proc_tmpl(K, Msg), + emqx_plugin_libs_rule:proc_tmpl(V, Msg) + } + end, HeaderTks). make_method(M) when M == <<"POST">>; M == <<"post">> -> post; make_method(M) when M == <<"PUT">>; M == <<"put">> -> put; diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index be8b4d074..1fbdb9eca 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -436,6 +436,7 @@ typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, examp typename_to_spec("bytesize()", _Mod) -> #{type => string, example => <<"32MB">>}; typename_to_spec("wordsize()", _Mod) -> #{type => string, example => <<"1024KB">>}; typename_to_spec("map()", _Mod) -> #{type => object, example => #{}}; +typename_to_spec("{binary(), binary()}", _Mod) -> #{type => object, example => #{}}; typename_to_spec("comma_separated_list()", _Mod) -> #{type => string, example => <<"item1,item2">>}; typename_to_spec("comma_separated_atoms()", _Mod) -> From 47441fb747dfc7c0f40c28cab81a9a550df61cd0 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 5 Jan 2022 15:49:44 +0800 Subject: [PATCH 18/29] fix(gw): fix bad default oom policy --- apps/emqx_gateway/src/emqx_gateway_utils.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index cc771ed95..638535b43 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -70,7 +70,7 @@ -define(DEFAULT_IDLE_TIMEOUT, 30000). -define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}). -define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304, - message_queue_len => 32000}). + max_message_queue_len => 32000}). -elvis([{elvis_style, god_modules, disable}]). From c6c7f70a3030a1b84c7cd07fa8dc1ec2432d4944 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 5 Jan 2022 15:57:19 +0800 Subject: [PATCH 19/29] chore(hocon): upgrade hocon to 0.22.3 --- apps/emqx/rebar.config | 2 +- rebar.config | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 31caa498d..3d016a461 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -17,7 +17,7 @@ , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.1"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.3"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.16.0"}}} diff --git a/rebar.config b/rebar.config index 61a3f4510..91a2aca1c 100644 --- a/rebar.config +++ b/rebar.config @@ -65,7 +65,7 @@ , {system_monitor, {git, "https://github.com/klarna-incubator/system_monitor", {tag, "2.2.0"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.16.0"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.1"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.3"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} From 31aed3ea8ecf5e5d41986c0aa0fd9e85cfe1d94b Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 5 Jan 2022 15:59:17 +0800 Subject: [PATCH 20/29] fix(prometheus): stop app with error check & dialyzer --- apps/emqx_prometheus/src/emqx_prometheus.erl | 19 ++++++++++------- apps/emqx_statsd/src/emqx_statsd.erl | 22 ++++++++++---------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index bd15d3c87..8ffc1c0eb 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -95,18 +95,21 @@ do_start() -> emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus])). do_stop() -> - emqx_prometheus_sup:stop_child(?APP). - -do_restart() -> - case {stop(), start()} of - {ok, ok} -> + case emqx_prometheus_sup:stop_child(?APP) of + ok -> ok; - {Error1, Error2} -> - ?LOG(error, "~p restart failed stop: ~p start: ~p", [?MODULE, Error1, Error2]) + {error, not_found} -> + ok end. +do_restart() -> + ok = do_start(), + ok = do_stop(), + ok. + cluster_call(F, A) -> - [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()]. + [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()], + ok. rpc_call(N, F, A) -> case rpc:call(N, ?MODULE, F, A, 5000) of diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 4b7852997..99ba7c06e 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -61,10 +61,12 @@ update(Config) -> Config, #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewConfigRows}} -> - start(), - case maps:get(<<"enable">>, Config) of - true -> stop(); - false -> ok + _ = start(), + case maps:get(<<"enable">>, Config, true) of + true -> + ok = stop(); + false -> + ignore end, {ok, NewConfigRows}; {error, Reason} -> @@ -83,15 +85,13 @@ do_stop() -> emqx_statsd_sup:ensure_child_stopped(?APP). do_restart() -> - case {stop(), start()} of - {ok, ok} -> - ok; - {Error1, Error2} -> - ?LOG(error, "~p restart failed stop: ~p start: ~p", [?MODULE, Error1, Error2]) - end. + ok = do_start(), + ok = do_stop(), + ok. cluster_call(F, A) -> - [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()]. + [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()], + ok. rpc_call(N, F, A) -> case rpc:call(N, ?MODULE, F, A, 5000) of From 6c574c08b8cb63e9a45aa449aa5472b97a08e0ac Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 5 Jan 2022 16:11:55 +0800 Subject: [PATCH 21/29] fix(config): update config by emqx_conf --- apps/emqx_prometheus/src/emqx_prometheus.erl | 2 +- apps/emqx_statsd/src/emqx_statsd.erl | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 8ffc1c0eb..92513ed71 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -73,7 +73,7 @@ %%-------------------------------------------------------------------- %% update new config update(Config) -> - case emqx:update_config([prometheus], Config, + case emqx_conf:update([prometheus], Config, #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewConfigRows}} -> case maps:get(<<"enable">>, Config, true) of diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 99ba7c06e..545019884 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -57,14 +57,14 @@ }). update(Config) -> - case emqx:update_config([statsd], + case emqx_conf:update([statsd], Config, #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewConfigRows}} -> - _ = start(), + ok = stop(), case maps:get(<<"enable">>, Config, true) of true -> - ok = stop(); + ok = start(); false -> ignore end, From 5e48f55eefee95951ae09b380470a67fd1cd8eb6 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 5 Jan 2022 17:32:09 +0800 Subject: [PATCH 22/29] fix(prometheus): restart error --- apps/emqx_prometheus/src/emqx_prometheus.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 92513ed71..1ba507ba4 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -103,8 +103,8 @@ do_stop() -> end. do_restart() -> - ok = do_start(), ok = do_stop(), + ok = do_start(), ok. cluster_call(F, A) -> From 4588ace79f0810ae9f507c6942a1d99fdc496722 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 5 Jan 2022 17:34:20 +0800 Subject: [PATCH 23/29] fix(statsd): restart error --- apps/emqx_statsd/src/emqx_statsd.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 545019884..7a7301405 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -85,8 +85,8 @@ do_stop() -> emqx_statsd_sup:ensure_child_stopped(?APP). do_restart() -> - ok = do_start(), ok = do_stop(), + ok = do_start(), ok. cluster_call(F, A) -> From 7e48a4e6f5f01c3d06edc4a40a66215493b37b68 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 5 Jan 2022 19:04:51 +0800 Subject: [PATCH 24/29] fix: authz default should be a map --- apps/emqx_authz/src/emqx_authz_schema.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index d68d97cf3..ef5a7c68b 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -164,7 +164,7 @@ headers(converter) -> fun(Headers) -> maps:to_list(maps:merge(default_headers(), transform_header_name(Headers))) end; -headers(default) -> maps:to_list(default_headers()); +headers(default) -> default_headers(); headers(_) -> undefined. headers_no_content_type(type) -> list({binary(), binary()}); @@ -172,7 +172,7 @@ headers_no_content_type(converter) -> fun(Headers) -> maps:to_list(maps:merge(default_headers_no_content_type(), transform_header_name(Headers))) end; -headers_no_content_type(default) -> maps:to_list(default_headers_no_content_type()); +headers_no_content_type(default) -> default_headers_no_content_type(); headers_no_content_type(_) -> undefined. url(type) -> binary(); From d3d240e8d54af605f800698f73e32a645753e785 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 5 Jan 2022 19:50:11 +0800 Subject: [PATCH 25/29] fix: revert hocon to 0.22.1 --- apps/emqx/rebar.config | 2 +- rebar.config | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 3d016a461..31caa498d 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -17,7 +17,7 @@ , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.3"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.1"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.16.0"}}} diff --git a/rebar.config b/rebar.config index 91a2aca1c..61a3f4510 100644 --- a/rebar.config +++ b/rebar.config @@ -65,7 +65,7 @@ , {system_monitor, {git, "https://github.com/klarna-incubator/system_monitor", {tag, "2.2.0"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.16.0"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.3"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.1"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} From f4f5281b166dbc923f1baed88da7005a1b8d6cc7 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 5 Jan 2022 22:06:50 +0800 Subject: [PATCH 26/29] fix: authz headers default value --- .../emqx_authz/src/emqx_authz_api_sources.erl | 19 ++++++++++++++++--- apps/emqx_authz/src/emqx_authz_schema.erl | 4 ++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/apps/emqx_authz/src/emqx_authz_api_sources.erl b/apps/emqx_authz/src/emqx_authz_api_sources.erl index df5a6c819..d9699d968 100644 --- a/apps/emqx_authz/src/emqx_authz_api_sources.erl +++ b/apps/emqx_authz/src/emqx_authz_api_sources.erl @@ -412,9 +412,22 @@ get_raw_sources() -> RawSources = emqx:get_raw_config([authorization, sources], []), Schema = #{roots => emqx_authz_schema:fields("authorization"), fields => #{}}, Conf = #{<<"sources">> => RawSources}, - #{<<"sources">> := Sources} = hocon_schema:check_plain(Schema, Conf, - #{only_fill_defaults => true}), - Sources. + #{<<"sources">> := Sources} = hocon_schema:check_plain(Schema, Conf, #{only_fill_defaults => true}), + merge_default_headers(Sources). + +merge_default_headers(Sources) -> + lists:map(fun(Source) -> + Convert = + case Source of + #{<<"method">> := <<"get">>} -> + emqx_authz_schema:headers_no_content_type(converter); + #{<<"method">> := <<"post">>} -> + emqx_authz_schema:headers(converter); + _ -> fun(H) -> H end + end, + Headers = Convert(maps:get(<<"headers">>, Source, #{})), + Source#{<<"headers">> => Headers} + end, Sources). get_raw_source(Type) -> lists:filter(fun (#{<<"type">> := T}) -> diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index ef5a7c68b..998c5fca9 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -31,6 +31,10 @@ , fields/1 , validations/0 ]). +-export([ + headers_no_content_type/1, + headers/1 + ]). -import(emqx_schema, [mk_duration/2]). -include_lib("hocon/include/hoconsc.hrl"). From f4ed3ccdd72ce2313682caff69522c00f177c64c Mon Sep 17 00:00:00 2001 From: zhouzb Date: Wed, 5 Jan 2022 23:30:06 +0800 Subject: [PATCH 27/29] chore(release): update emqx release version --- apps/emqx/include/emqx_release.hrl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 1424eb8a5..7a1310f88 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -24,4 +24,4 @@ %% NOTE: This version number should be manually bumped for each release --define(EMQX_RELEASE, "5.0-beta.2"). +-define(EMQX_RELEASE, "5.0-beta.3"). From 517b634409e82986616eda38c31c2249a64778c2 Mon Sep 17 00:00:00 2001 From: zhanghongtong Date: Thu, 6 Jan 2022 09:17:06 +0800 Subject: [PATCH 28/29] build(windows): do not compile doc for windows --- rebar.config.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rebar.config.erl b/rebar.config.erl index dbaf9d6a3..3c14f8311 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -122,42 +122,42 @@ profiles() -> , {relx, relx(Vsn, cloud, bin, ce)} , {overrides, prod_overrides()} , {project_app_dirs, project_app_dirs(ce)} - , {post_hooks, [{compile, "./build emqx doc"}]} + , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx doc"}]} ]} , {'emqx-pkg', [ {erl_opts, prod_compile_opts()} , {relx, relx(Vsn, cloud, pkg, ce)} , {overrides, prod_overrides()} , {project_app_dirs, project_app_dirs(ce)} - , {post_hooks, [{compile, "./build emqx-pkg doc"}]} + , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-pkg doc"}]} ]} , {'emqx-enterprise', [ {erl_opts, prod_compile_opts()} , {relx, relx(Vsn, cloud, bin, ee)} , {overrides, prod_overrides()} , {project_app_dirs, project_app_dirs(ee)} - , {post_hooks, [{compile, "./build emqx-enterprise doc"}]} + , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-enterprise doc"}]} ]} , {'emqx-enterprise-pkg', [ {erl_opts, prod_compile_opts()} , {relx, relx(Vsn, cloud, pkg, ee)} , {overrides, prod_overrides()} , {project_app_dirs, project_app_dirs(ee)} - , {post_hooks, [{compile, "./build emqx-enterprise-pkg doc"}]} + , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-enterprise-pkg doc"}]} ]} , {'emqx-edge', [ {erl_opts, prod_compile_opts()} , {relx, relx(Vsn, edge, bin, ce)} , {overrides, prod_overrides()} , {project_app_dirs, project_app_dirs(ce)} - , {post_hooks, [{compile, "./build emqx-edge doc"}]} + , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-edge doc"}]} ]} , {'emqx-edge-pkg', [ {erl_opts, prod_compile_opts()} , {relx, relx(Vsn, edge, pkg, ce)} , {overrides, prod_overrides()} , {project_app_dirs, project_app_dirs(ce)} - , {post_hooks, [{compile, "./build emqx-edge-pkg doc"}]} + , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-edge-pkg doc"}]} ]} , {check, [ {erl_opts, common_compile_opts()} From 552165db26e610ed1ec4df0346f932cff3abf44c Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 6 Jan 2022 15:05:52 +0800 Subject: [PATCH 29/29] test(authz): fix test suite based on release-5.0-beta.3 --- apps/emqx_authz/test/emqx_authz_SUITE.erl | 6 +- .../emqx_authz/test/emqx_authz_http_SUITE.erl | 119 +++++------------- 2 files changed, 30 insertions(+), 95 deletions(-) diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index ff8a99b5c..17c9ca519 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -64,9 +64,7 @@ set_special_configs(_App) -> -define(SOURCE1, #{<<"type">> => <<"http">>, <<"enable">> => true, - <<"base_url">> => <<"https://example.com:443/">>, - <<"path">> => <<"a/b">>, - <<"query">> => <<"c=d">>, + <<"url">> => <<"https://example.com:443/a/b?c=d">>, <<"headers">> => #{}, <<"method">> => <<"get">>, <<"request_timeout">> => 5000 diff --git a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl index 9a3b86958..40c5f15ba 100644 --- a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -156,49 +156,14 @@ t_query_params(_Config) -> Req = cowboy_req:reply(200, Req0), {ok, Req, State} end, - #{<<"query">> => <<"username=${username}&" - "clientid=${clientid}&" - "peerhost=${peerhost}&" - "proto_name=${proto_name}&" - "mountpoint=${mountpoint}&" - "topic=${topic}&" - "action=${action}">> - }), - - ClientInfo = #{clientid => <<"client id">>, - username => <<"user name">>, - peerhost => {127,0,0,1}, - protocol => <<"MQTT">>, - mountpoint => <<"MOUNTPOINT">>, - zone => default, - listener => {tcp, default} - }, - - ?assertEqual( - allow, - emqx_access_control:authorize(ClientInfo, publish, <<"t">>)). - -t_path_params(_Config) -> - ok = setup_handler_and_config( - fun(Req0, State) -> - <<"/authz/" - "username/user%20name/" - "clientid/client%20id/" - "peerhost/127.0.0.1/" - "proto_name/MQTT/" - "mountpoint/MOUNTPOINT/" - "topic/t/" - "action/publish">> = cowboy_req:path(Req0), - Req = cowboy_req:reply(200, Req0), - {ok, Req, State} - end, - #{<<"path">> => <<"username/${username}/" - "clientid/${clientid}/" - "peerhost/${peerhost}/" - "proto_name/${proto_name}/" - "mountpoint/${mountpoint}/" - "topic/${topic}/" - "action/${action}">> + #{<<"url">> => <<"http://127.0.0.1:33333/authz/users/?" + "username=${username}&" + "clientid=${clientid}&" + "peerhost=${host}&" + "proto_name=${proto_name}&" + "mountpoint=${mountpoint}&" + "topic=${topic}&" + "action=${action}">> }), ClientInfo = #{clientid => <<"client id">>, @@ -218,23 +183,16 @@ t_json_body(_Config) -> ok = setup_handler_and_config( fun(Req0, State) -> ?assertEqual( - <<"/authz/" - "username/user%20name/" - "clientid/client%20id/" - "peerhost/127.0.0.1/" - "proto_name/MQTT/" - "mountpoint/MOUNTPOINT/" - "topic/t/" - "action/publish">>, + <<"/authz/users/">>, cowboy_req:path(Req0)), {ok, RawBody, Req1} = cowboy_req:read_body(Req0), ?assertMatch( #{<<"username">> := <<"user name">>, - <<"CLIENT_client id">> := <<"client id">>, - <<"peerhost">> := [<<"127.0.0.1">>, 1], - <<"proto_name">> := #{<<"proto">> := <<"MQTT">>}, + <<"CLIENT">> := <<"client id">>, + <<"peerhost">> := <<"127.0.0.1">>, + <<"proto_name">> := <<"MQTT">>, <<"mountpoint">> := <<"MOUNTPOINT">>, <<"topic">> := <<"t">>, <<"action">> := <<"publish">>}, @@ -244,17 +202,10 @@ t_json_body(_Config) -> {ok, Req, State} end, #{<<"method">> => <<"post">>, - <<"path">> => <<"username/${username}/" - "clientid/${clientid}/" - "peerhost/${peerhost}/" - "proto_name/${proto_name}/" - "mountpoint/${mountpoint}/" - "topic/${topic}/" - "action/${action}">>, <<"body">> => #{<<"username">> => <<"${username}">>, - <<"CLIENT_${clientid}">> => <<"${clientid}">>, - <<"peerhost">> => [<<"${peerhost}">>, 1], - <<"proto_name">> => #{<<"proto">> => <<"${proto_name}">>}, + <<"CLIENT">> => <<"${clientid}">>, + <<"peerhost">> => <<"${host}">>, + <<"proto_name">> => <<"${proto_name}">>, <<"mountpoint">> => <<"${mountpoint}">>, <<"topic">> => <<"${topic}">>, <<"action">> => <<"${action}">>} @@ -278,17 +229,10 @@ t_form_body(_Config) -> ok = setup_handler_and_config( fun(Req0, State) -> ?assertEqual( - <<"/authz/" - "username/user%20name/" - "clientid/client%20id/" - "peerhost/127.0.0.1/" - "proto_name/MQTT/" - "mountpoint/MOUNTPOINT/" - "topic/t/" - "action/publish">>, + <<"/authz/users/">>, cowboy_req:path(Req0)), - - {ok, PostVars, Req1} = cowboy_req:read_urlencoded_body(Req0), + + {ok, [{PostVars, true}], Req1} = cowboy_req:read_urlencoded_body(Req0), ?assertMatch( #{<<"username">> := <<"user name">>, @@ -298,22 +242,15 @@ t_form_body(_Config) -> <<"mountpoint">> := <<"MOUNTPOINT">>, <<"topic">> := <<"t">>, <<"action">> := <<"publish">>}, - maps:from_list(PostVars)), + jiffy:decode(PostVars, [return_maps])), Req = cowboy_req:reply(200, Req1), {ok, Req, State} end, #{<<"method">> => <<"post">>, - <<"path">> => <<"username/${username}/" - "clientid/${clientid}/" - "peerhost/${peerhost}/" - "proto_name/${proto_name}/" - "mountpoint/${mountpoint}/" - "topic/${topic}/" - "action/${action}">>, <<"body">> => #{<<"username">> => <<"${username}">>, <<"clientid">> => <<"${clientid}">>, - <<"peerhost">> => <<"${peerhost}">>, + <<"peerhost">> => <<"${host}">>, <<"proto_name">> => <<"${proto_name}">>, <<"mountpoint">> => <<"${mountpoint}">>, <<"topic">> => <<"${topic}">>, @@ -349,7 +286,8 @@ t_create_replace(_Config) -> Req = cowboy_req:reply(200, Req0), {ok, Req, State} end, - #{<<"base_url">> => <<"http://127.0.0.1:33333/authz">>}), + #{<<"url">> => + <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>}), ?assertEqual( allow, @@ -358,7 +296,8 @@ t_create_replace(_Config) -> %% Changing to other bad config does not work BadConfig = maps:merge( raw_http_authz_config(), - #{<<"base_url">> => <<"http://127.0.0.1:33332/authz">>}), + #{<<"url">> => + <<"http://127.0.0.1:33332/authz/users/?topic=${topic}&action=${action}">>}), ?assertMatch( {error, _}, @@ -371,7 +310,8 @@ t_create_replace(_Config) -> %% Changing to valid config OkConfig = maps:merge( raw_http_authz_config(), - #{<<"base_url">> => <<"http://127.0.0.1:33333/authz">>}), + #{<<"url">> => + <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>}), ?assertMatch( {ok, _}, @@ -388,12 +328,9 @@ t_create_replace(_Config) -> raw_http_authz_config() -> #{ <<"enable">> => <<"true">>, - <<"type">> => <<"http">>, <<"method">> => <<"get">>, - <<"base_url">> => <<"http://127.0.0.1:33333/authz">>, - <<"path">> => <<"users/${username}/">>, - <<"query">> => <<"topic=${topic}&action=${action}">>, + <<"url">> => <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>, <<"headers">> => #{<<"X-Test-Header">> => <<"Test Value">>} }.