From 9b085b57979e792c0714abe4dc5dabf3a4c13173 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 21 Mar 2024 17:30:36 +0800 Subject: [PATCH 1/3] fix(api_schema): removed metrics schema in api spec - Followup [PR#6622](https://github.com/emqx/emqx/pull/6622). --- apps/emqx_management/src/emqx_mgmt_api_metrics.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl index 8d61ee1fb..f2e302569 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl @@ -264,10 +264,10 @@ properties() -> "messages.qos0.received\fmessages.qos1.received and messages.qos2.received" >> ), - m( - 'messages.retained', - <<"Number of retained messages">> - ), + %% m( + %% 'messages.retained', + %% <<"Number of retained messages">> + %% ), m( 'messages.sent', << From 42faffc320b97d64df3ab0759c7bff7499a64855 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 22 Mar 2024 18:03:13 +0800 Subject: [PATCH 2/3] refactor: uniform shared_sub table macros --- apps/emqx/include/emqx_shared_sub.hrl | 28 ++++++++++++ apps/emqx/src/emqx_shared_sub.erl | 66 +++++++++++++-------------- 2 files changed, 60 insertions(+), 34 deletions(-) create mode 100644 apps/emqx/include/emqx_shared_sub.hrl diff --git a/apps/emqx/include/emqx_shared_sub.hrl b/apps/emqx/include/emqx_shared_sub.hrl new file mode 100644 index 000000000..d744bd8a8 --- /dev/null +++ b/apps/emqx/include/emqx_shared_sub.hrl @@ -0,0 +1,28 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_SHARED_SUB_HRL). +-define(EMQX_SHARED_SUB_HRL, true). + +%% Mnesia table for shared sub message routing +-define(SHARED_SUBSCRIPTION, emqx_shared_subscription). + +%% ETS tables for Shared PubSub +-define(SHARED_SUBSCRIBER, emqx_shared_subscriber). +-define(ALIVE_SHARED_SUBSCRIBERS, emqx_alive_shared_subscribers). +-define(SHARED_SUBS_ROUND_ROBIN_COUNTER, emqx_shared_subscriber_round_robin_counter). + +-endif. diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index ce694ba33..54c107111 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -21,6 +21,7 @@ -include("emqx_schema.hrl"). -include("emqx.hrl"). -include("emqx_mqtt.hrl"). +-include("emqx_shared_sub.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -84,10 +85,7 @@ | hash_topic. -define(SERVER, ?MODULE). --define(TAB, emqx_shared_subscription). --define(SHARED_SUBS_ROUND_ROBIN_COUNTER, emqx_shared_subscriber_round_robin_counter). --define(SHARED_SUBS, emqx_shared_subscriber). --define(ALIVE_SUBS, emqx_alive_shared_subscribers). + -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5). -define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())). -define(ACK, shared_sub_ack). @@ -99,21 +97,21 @@ -record(state, {pmon}). --record(emqx_shared_subscription, {group, topic, subpid}). +-record(?SHARED_SUBSCRIPTION, {group, topic, subpid}). %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- create_tables() -> - ok = mria:create_table(?TAB, [ + ok = mria:create_table(?SHARED_SUBSCRIPTION, [ {type, bag}, {rlog_shard, ?SHARED_SUB_SHARD}, {storage, ram_copies}, - {record_name, emqx_shared_subscription}, - {attributes, record_info(fields, emqx_shared_subscription)} + {record_name, ?SHARED_SUBSCRIPTION}, + {attributes, record_info(fields, ?SHARED_SUBSCRIPTION)} ]), - [?TAB]. + [?SHARED_SUBSCRIPTION]. %%-------------------------------------------------------------------- %% API @@ -132,7 +130,7 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}). record(Group, Topic, SubPid) -> - #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. + #?SHARED_SUBSCRIPTION{group = Group, topic = Topic, subpid = SubPid}. -spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). @@ -394,18 +392,18 @@ subscribers(Group, Topic, FailedSubs) -> %% Select ETS table to get all subscriber pids. subscribers(Group, Topic) -> - ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). + ets:select(?SHARED_SUBSCRIPTION, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> - ok = mria:wait_for_tables([?TAB]), - {ok, _} = mnesia:subscribe({table, ?TAB, simple}), + ok = mria:wait_for_tables([?SHARED_SUBSCRIPTION]), + {ok, _} = mnesia:subscribe({table, ?SHARED_SUBSCRIPTION, simple}), {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun ?MODULE:init_monitors/0), - ok = emqx_utils_ets:new(?SHARED_SUBS, [protected, bag]), - ok = emqx_utils_ets:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), + ok = emqx_utils_ets:new(?SHARED_SUBSCRIBER, [protected, bag]), + ok = emqx_utils_ets:new(?ALIVE_SHARED_SUBSCRIBERS, [protected, set, {read_concurrency, true}]), ok = emqx_utils_ets:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [ public, set, {write_concurrency, true} ]), @@ -413,26 +411,26 @@ init([]) -> init_monitors() -> mnesia:foldl( - fun(#emqx_shared_subscription{subpid = SubPid}, Mon) -> + fun(#?SHARED_SUBSCRIPTION{subpid = SubPid}, Mon) -> emqx_pmon:monitor(SubPid, Mon) end, emqx_pmon:new(), - ?TAB + ?SHARED_SUBSCRIPTION ). handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> - mria:dirty_write(?TAB, record(Group, Topic, SubPid)), - case ets:member(?SHARED_SUBS, {Group, Topic}) of + mria:dirty_write(?SHARED_SUBSCRIPTION, record(Group, Topic, SubPid)), + case ets:member(?SHARED_SUBSCRIBER, {Group, Topic}) of true -> ok; false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) end, ok = maybe_insert_alive_tab(SubPid), ok = maybe_insert_round_robin_count({Group, Topic}), - true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}), + true = ets:insert(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}), {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> - mria:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), - true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), + mria:dirty_delete_object(?SHARED_SUBSCRIPTION, record(Group, Topic, SubPid)), + true = ets:delete_object(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}), maybe_delete_round_robin_count({Group, Topic}), {reply, ok, update_stats(State)}; @@ -445,7 +443,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info( - {mnesia_table_event, {write, #emqx_shared_subscription{subpid = SubPid}, _}}, + {mnesia_table_event, {write, #?SHARED_SUBSCRIPTION{subpid = SubPid}, _}}, State = #state{pmon = PMon} ) -> ok = maybe_insert_alive_tab(SubPid), @@ -455,7 +453,7 @@ handle_info( %% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually %% be disconnected. % handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) -> -% #emqx_shared_subscription{subpid = SubPid} = OldRecord, +% #?SHARED_SUBSCRIPTION{subpid = SubPid} = OldRecord, % {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})}; handle_info({mnesia_table_event, _Event}, State) -> @@ -468,7 +466,7 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, _State) -> - mnesia:unsubscribe({table, ?TAB, simple}). + mnesia:unsubscribe({table, ?SHARED_SUBSCRIPTION, simple}). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -501,7 +499,7 @@ maybe_delete_round_robin_count({Group, _Topic} = GroupTopic) -> ok. if_no_more_subscribers(GroupTopic, Fn) -> - case ets:member(?SHARED_SUBS, GroupTopic) of + case ets:member(?SHARED_SUBSCRIBER, GroupTopic) of true -> ok; false -> Fn() end, @@ -510,26 +508,26 @@ if_no_more_subscribers(GroupTopic, Fn) -> %% keep track of alive remote pids maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok; maybe_insert_alive_tab(Pid) when is_pid(Pid) -> - ets:insert(?ALIVE_SUBS, {Pid}), + ets:insert(?ALIVE_SHARED_SUBSCRIBERS, {Pid}), ok. cleanup_down(SubPid) -> - ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid), + ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SHARED_SUBSCRIBERS, SubPid), lists:foreach( - fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) -> - ok = mria:dirty_delete_object(?TAB, Record), - true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), + fun(Record = #?SHARED_SUBSCRIPTION{topic = Topic, group = Group}) -> + ok = mria:dirty_delete_object(?SHARED_SUBSCRIPTION, Record), + true = ets:delete_object(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}), maybe_delete_round_robin_count({Group, Topic}), delete_route_if_needed({Group, Topic}) end, - mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid}) + mnesia:dirty_match_object(#?SHARED_SUBSCRIPTION{_ = '_', subpid = SubPid}) ). update_stats(State) -> emqx_stats:setstat( 'subscriptions.shared.count', 'subscriptions.shared.max', - ets:info(?TAB, size) + ets:info(?SHARED_SUBSCRIPTION, size) ), State. @@ -543,7 +541,7 @@ is_active_sub(Pid, FailedSubs, All) -> is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> erlang:is_process_alive(Pid); is_alive_sub(Pid) -> - [] =/= ets:lookup(?ALIVE_SUBS, Pid). + [] =/= ets:lookup(?ALIVE_SHARED_SUBSCRIBERS, Pid). delete_route_if_needed({Group, Topic} = GroupTopic) -> if_no_more_subscribers(GroupTopic, fun() -> From 5f7f9e43f97187b80fa7b35e89ccf114e608fae8 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 22 Mar 2024 18:31:57 +0800 Subject: [PATCH 3/3] fix(stats): `'subscribers.count'` contains shared-subscriber --- apps/emqx/src/emqx_broker.erl | 18 ---------------- apps/emqx/src/emqx_broker_helper.erl | 31 +++++++++++++++++++++++++++- apps/emqx/test/emqx_broker_SUITE.erl | 2 +- apps/emqx/test/emqx_stats_SUITE.erl | 6 +++--- changes/ce/fix-12765.en.md | 2 ++ 5 files changed, 36 insertions(+), 23 deletions(-) create mode 100644 changes/ce/fix-12765.en.md diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 6f6580517..1470b7d8b 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -60,9 +60,6 @@ -export([topics/0]). -%% Stats fun --export([stats_fun/0]). - %% gen_server callbacks -export([ init/1, @@ -475,21 +472,6 @@ set_subopts(SubPid, Topic, NewOpts) -> topics() -> emqx_router:topics(). -%%-------------------------------------------------------------------- -%% Stats fun -%%-------------------------------------------------------------------- - -stats_fun() -> - safe_update_stats(?SUBSCRIBER, 'subscribers.count', 'subscribers.max'), - safe_update_stats(?SUBSCRIPTION, 'subscriptions.count', 'subscriptions.max'), - safe_update_stats(?SUBOPTION, 'suboptions.count', 'suboptions.max'). - -safe_update_stats(Tab, Stat, MaxStat) -> - case ets:info(Tab, size) of - undefined -> ok; - Size -> emqx_stats:setstat(Stat, MaxStat, Size) - end. - %%-------------------------------------------------------------------- %% call, cast, pick %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_broker_helper.erl b/apps/emqx/src/emqx_broker_helper.erl index 8562a1968..368398b92 100644 --- a/apps/emqx/src/emqx_broker_helper.erl +++ b/apps/emqx/src/emqx_broker_helper.erl @@ -18,6 +18,8 @@ -behaviour(gen_server). +-include("emqx_router.hrl"). +-include("emqx_shared_sub.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -33,6 +35,9 @@ reclaim_seq/1 ]). +%% Stats fun +-export([stats_fun/0]). + %% gen_server callbacks -export([ init/1, @@ -99,6 +104,30 @@ create_seq(Topic) -> reclaim_seq(Topic) -> emqx_sequence:reclaim(?SUBSEQ, Topic). +%%-------------------------------------------------------------------- +%% Stats fun +%%-------------------------------------------------------------------- + +stats_fun() -> + safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'), + safe_update_stats(table_size(?SUBSCRIPTION), 'subscriptions.count', 'subscriptions.max'), + safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max'). + +safe_update_stats(undefined, _Stat, _MaxStat) -> + ok; +safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) -> + emqx_stats:setstat(Stat, MaxStat, Val). + +subscriber_val() -> + sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)). + +sum_subscriber(undefined, undefined) -> undefined; +sum_subscriber(undefined, V2) when is_integer(V2) -> V2; +sum_subscriber(V1, undefined) when is_integer(V1) -> V1; +sum_subscriber(V1, V2) when is_integer(V1), is_integer(V2) -> V1 + V2. + +table_size(Tab) when is_atom(Tab) -> ets:info(Tab, size). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -115,7 +144,7 @@ init([]) -> %% SubMon: SubPid -> SubId ok = emqx_utils_ets:new(?SUBMON, [public, {read_concurrency, true}, {write_concurrency, true}]), %% Stats timer - ok = emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0), + ok = emqx_stats:update_interval(broker_stats, fun ?MODULE:stats_fun/0), {ok, #{pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index d4bb9e7fc..e106e3375 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -158,7 +158,7 @@ t_stats_fun(Config) when is_list(Config) -> ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>), ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>), %% ensure stats refreshed - emqx_broker:stats_fun(), + emqx_broker_helper:stats_fun(), %% emqx_stats:set_stat is a gen_server cast %% make a synced call sync ignored = gen_server:call(emqx_stats, call, infinity), diff --git a/apps/emqx/test/emqx_stats_SUITE.erl b/apps/emqx/test/emqx_stats_SUITE.erl index 1a672fa67..1c32396ce 100644 --- a/apps/emqx/test/emqx_stats_SUITE.erl +++ b/apps/emqx/test/emqx_stats_SUITE.erl @@ -105,10 +105,10 @@ t_helper(_) -> end end, [ - {"emqx_broker", MkTestFun(emqx_broker, stats_fun)}, - {"emqx_sm", MkTestFun(emqx_sm, stats_fun)}, + {"emqx_broker_helper", MkTestFun(emqx_broker_helper, stats_fun)}, {"emqx_router_helper", MkTestFun(emqx_router_helper, stats_fun)}, - {"emqx_cm", MkTestFun(emqx_cm, stats_fun)} + {"emqx_cm", MkTestFun(emqx_cm, stats_fun)}, + {"emqx_retainer", MkTestFun(emqx_retainer, stats_fun)} ]. with_proc(F) -> diff --git a/changes/ce/fix-12765.en.md b/changes/ce/fix-12765.en.md new file mode 100644 index 000000000..01c13146d --- /dev/null +++ b/changes/ce/fix-12765.en.md @@ -0,0 +1,2 @@ +Make sure stats `'subscribers.count'` `'subscribers.max'` countains shared-subscribers. +It only contains non-shared subscribers previously.