From ee6e7174cf5a7b858fe59fdeafa95787c3111fff Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 16 May 2024 18:26:13 +0200 Subject: [PATCH] fix(sessds): Rename the durable messages DB to `messages` --- apps/emqx/include/emqx_persistent_message.hrl | 29 +++++++++++++++++++ apps/emqx/src/emqx_persistent_message.erl | 14 ++------- apps/emqx/src/emqx_persistent_session_ds.hrl | 2 +- .../test/emqx_persistent_messages_SUITE.erl | 2 +- .../test/emqx_persistent_session_SUITE.erl | 3 +- .../src/emqx_ds_builtin_metrics.erl | 10 +++---- apps/emqx_management/src/emqx_mgmt_api_ds.erl | 9 +++--- .../test/emqx_mgmt_api_ds_SUITE.erl | 20 ++++++------- 8 files changed, 56 insertions(+), 33 deletions(-) create mode 100644 apps/emqx/include/emqx_persistent_message.hrl diff --git a/apps/emqx/include/emqx_persistent_message.hrl b/apps/emqx/include/emqx_persistent_message.hrl new file mode 100644 index 000000000..256ec724f --- /dev/null +++ b/apps/emqx/include/emqx_persistent_message.hrl @@ -0,0 +1,29 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_PERSISTENT_MESSAGE_HRL). +-define(EMQX_PERSISTENT_MESSAGE_HRL, true). + +-define(PERSISTENT_MESSAGE_DB, messages). +-define(PERSISTENCE_ENABLED, emqx_message_persistence_enabled). + +-define(WITH_DURABILITY_ENABLED(DO), + case is_persistence_enabled() of + true -> DO; + false -> {skipped, disabled} + end +). + +-endif. diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index dc991619b..30f5c6b45 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -32,15 +32,7 @@ persist/1 ]). --define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). --define(PERSISTENCE_ENABLED, emqx_message_persistence_enabled). - --define(WHEN_ENABLED(DO), - case is_persistence_enabled() of - true -> DO; - false -> {skipped, disabled} - end -). +-include("emqx_persistent_message.hrl"). %%-------------------------------------------------------------------- @@ -51,7 +43,7 @@ init() -> Zones = maps:keys(emqx_config:get([zones])), IsEnabled = lists:any(fun is_persistence_enabled/1, Zones), persistent_term:put(?PERSISTENCE_ENABLED, IsEnabled), - ?WHEN_ENABLED(begin + ?WITH_DURABILITY_ENABLED(begin ?SLOG(notice, #{msg => "Session durability is enabled"}), Backend = storage_backend(), ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, Backend), @@ -100,7 +92,7 @@ pre_config_update(_Root, _NewConf, _OldConf) -> -spec persist(emqx_types:message()) -> emqx_ds:store_batch_result() | {skipped, needs_no_persistence}. persist(Msg) -> - ?WHEN_ENABLED( + ?WITH_DURABILITY_ENABLED( case needs_persistence(Msg) andalso has_subscribers(Msg) of true -> store_message(Msg); diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 12372e5be..fdbf2c6ea 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -16,7 +16,7 @@ -ifndef(EMQX_PERSISTENT_SESSION_DS_HRL_HRL). -define(EMQX_PERSISTENT_SESSION_DS_HRL_HRL, true). --define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). +-include("emqx_persistent_message.hrl"). -define(SESSION_TAB, emqx_ds_session). -define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions). diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 492fcaa6b..6ad91f760 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -27,7 +27,7 @@ -import(emqx_common_test_helpers, [on_exit/1]). --define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). +-include("emqx_persistent_message.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index f0b783250..68cb5cd3b 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -26,7 +26,8 @@ -compile(export_all). -compile(nowarn_export_all). --define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). +-include("emqx_persistent_message.hrl"). + -define(EMQX_CONFIG, "sys_topics.sys_heartbeat_interval = 1s\n"). %%-------------------------------------------------------------------- diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index 763d38606..06bf7f045 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -186,7 +186,7 @@ prometheus_per_db(NodeOrAggr) -> %% This function returns the data in the following format: %% ``` %% #{emqx_ds_store_batch_time => -%% [{[{db, emqx_persistent_message}], 42}], +%% [{[{db, messages}], 42}], %% ... %% ''' %% @@ -222,11 +222,11 @@ prometheus_per_db(NodeOrAggr, DB, Acc0) -> %% This function returns the data in the following format: %% ``` %% #{emqx_ds_egress_batches => -%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],99408}, -%% {[{db,emqx_persistent_message},{shard,<<"0">>}],99409}], +%% [{[{db,messages},{shard,<<"1">>}],99408}, +%% {[{db,messages},{shard,<<"0">>}],99409}], %% emqx_ds_egress_batches_retry => -%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],0}, -%% {[{db,emqx_persistent_message},{shard,<<"0">>}],0}], +%% [{[{db,messages},{shard,<<"1">>}],0}, +%% {[{db,messages},{shard,<<"0">>}],0}], %% emqx_ds_egress_messages => %% ... %% } diff --git a/apps/emqx_management/src/emqx_mgmt_api_ds.erl b/apps/emqx_management/src/emqx_mgmt_api_ds.erl index 5f36bdce7..0e3962692 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_ds.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_ds.erl @@ -21,6 +21,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx_utils/include/emqx_utils_api.hrl"). +-include_lib("emqx/include/emqx_persistent_message.hrl"). -import(hoconsc, [mk/2, ref/1, enum/1, array/1]). @@ -222,7 +223,7 @@ fields(sites_shard) -> atom(), #{ desc => <<"Durable storage ID">>, - example => 'emqx_persistent_message' + example => ?PERSISTENT_MESSAGE_DB } )}, {id, @@ -249,7 +250,7 @@ fields(db) -> atom(), #{ desc => <<"Name of the durable storage">>, - example => 'emqx_persistent_message' + example => ?PERSISTENT_MESSAGE_DB } )}, {shards, @@ -403,7 +404,7 @@ param_storage_id() -> required => true, in => path, desc => <<"Durable storage ID">>, - example => emqx_persistent_message + example => ?PERSISTENT_MESSAGE_DB }, {ds, mk(enum(dbs()), Info)}. @@ -416,7 +417,7 @@ example_site() -> end. dbs() -> - [emqx_persistent_message]. + [?PERSISTENT_MESSAGE_DB]. shards_of_site(Site) -> lists:flatmap( diff --git a/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl index 8048a6820..ed6fdcc7e 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl @@ -59,7 +59,7 @@ t_get_storages(_) -> Path = api_path(["ds", "storages"]), {ok, Response} = request_api(get, Path), ?assertEqual( - [<<"emqx_persistent_message">>], + [<<"messages">>], emqx_utils_json:decode(Response, [return_maps]) ). @@ -81,7 +81,7 @@ t_get_site(_) -> <<"shards">> := [ #{ - <<"storage">> := <<"emqx_persistent_message">>, + <<"storage">> := <<"messages">>, <<"id">> := _, <<"status">> := <<"up">> } @@ -99,12 +99,12 @@ t_get_db(_) -> request_api(get, Path400) ), %% Valid path: - Path = api_path(["ds", "storages", "emqx_persistent_message"]), + Path = api_path(["ds", "storages", "messages"]), {ok, Response} = request_api(get, Path), ThisSite = emqx_ds_replication_layer_meta:this_site(), ?assertMatch( #{ - <<"name">> := <<"emqx_persistent_message">>, + <<"name">> := <<"messages">>, <<"shards">> := [ #{ @@ -132,7 +132,7 @@ t_get_replicas(_) -> request_api(get, Path400) ), %% Valid path: - Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]), + Path = api_path(["ds", "storages", "messages", "replicas"]), {ok, Response} = request_api(get, Path), ThisSite = emqx_ds_replication_layer_meta:this_site(), ?assertEqual( @@ -141,7 +141,7 @@ t_get_replicas(_) -> ). t_put_replicas(_) -> - Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]), + Path = api_path(["ds", "storages", "messages", "replicas"]), %% Error cases: ?assertMatch( {ok, 400, #{<<"message">> := <<"Unknown sites: invalid_site">>}}, @@ -154,13 +154,13 @@ t_put_replicas(_) -> ). t_join(_) -> - Path400 = api_path(["ds", "storages", "emqx_persistent_message", "replicas", "unknown_site"]), + Path400 = api_path(["ds", "storages", "messages", "replicas", "unknown_site"]), ?assertMatch( {error, {_, 400, _}}, parse_error(request_api(put, Path400)) ), ThisSite = emqx_ds_replication_layer_meta:this_site(), - Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]), + Path = api_path(["ds", "storages", "messages", "replicas", ThisSite]), ?assertMatch( {ok, "OK"}, request_api(put, Path) @@ -168,7 +168,7 @@ t_join(_) -> t_leave(_) -> ThisSite = emqx_ds_replication_layer_meta:this_site(), - Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]), + Path = api_path(["ds", "storages", "messages", "replicas", ThisSite]), ?assertMatch( {error, {_, 400, _}}, request_api(delete, Path) @@ -176,7 +176,7 @@ t_leave(_) -> t_leave_notfound(_) -> Site = "not_part_of_replica_set", - Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", Site]), + Path = api_path(["ds", "storages", "messages", "replicas", Site]), ?assertMatch( {error, {_, 404, _}}, request_api(delete, Path)