fix(sessds): Rename the durable messages DB to `messages`

This commit is contained in:
ieQu1 2024-05-16 18:26:13 +02:00
parent ba74135079
commit ee6e7174cf
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
8 changed files with 56 additions and 33 deletions

View File

@ -0,0 +1,29 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_PERSISTENT_MESSAGE_HRL).
-define(EMQX_PERSISTENT_MESSAGE_HRL, true).
-define(PERSISTENT_MESSAGE_DB, messages).
-define(PERSISTENCE_ENABLED, emqx_message_persistence_enabled).
-define(WITH_DURABILITY_ENABLED(DO),
case is_persistence_enabled() of
true -> DO;
false -> {skipped, disabled}
end
).
-endif.

View File

@ -32,15 +32,7 @@
persist/1 persist/1
]). ]).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). -include("emqx_persistent_message.hrl").
-define(PERSISTENCE_ENABLED, emqx_message_persistence_enabled).
-define(WHEN_ENABLED(DO),
case is_persistence_enabled() of
true -> DO;
false -> {skipped, disabled}
end
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -51,7 +43,7 @@ init() ->
Zones = maps:keys(emqx_config:get([zones])), Zones = maps:keys(emqx_config:get([zones])),
IsEnabled = lists:any(fun is_persistence_enabled/1, Zones), IsEnabled = lists:any(fun is_persistence_enabled/1, Zones),
persistent_term:put(?PERSISTENCE_ENABLED, IsEnabled), persistent_term:put(?PERSISTENCE_ENABLED, IsEnabled),
?WHEN_ENABLED(begin ?WITH_DURABILITY_ENABLED(begin
?SLOG(notice, #{msg => "Session durability is enabled"}), ?SLOG(notice, #{msg => "Session durability is enabled"}),
Backend = storage_backend(), Backend = storage_backend(),
ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, Backend), ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, Backend),
@ -100,7 +92,7 @@ pre_config_update(_Root, _NewConf, _OldConf) ->
-spec persist(emqx_types:message()) -> -spec persist(emqx_types:message()) ->
emqx_ds:store_batch_result() | {skipped, needs_no_persistence}. emqx_ds:store_batch_result() | {skipped, needs_no_persistence}.
persist(Msg) -> persist(Msg) ->
?WHEN_ENABLED( ?WITH_DURABILITY_ENABLED(
case needs_persistence(Msg) andalso has_subscribers(Msg) of case needs_persistence(Msg) andalso has_subscribers(Msg) of
true -> true ->
store_message(Msg); store_message(Msg);

View File

@ -16,7 +16,7 @@
-ifndef(EMQX_PERSISTENT_SESSION_DS_HRL_HRL). -ifndef(EMQX_PERSISTENT_SESSION_DS_HRL_HRL).
-define(EMQX_PERSISTENT_SESSION_DS_HRL_HRL, true). -define(EMQX_PERSISTENT_SESSION_DS_HRL_HRL, true).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). -include("emqx_persistent_message.hrl").
-define(SESSION_TAB, emqx_ds_session). -define(SESSION_TAB, emqx_ds_session).
-define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions). -define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).

View File

@ -27,7 +27,7 @@
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). -include("emqx_persistent_message.hrl").
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).

View File

@ -26,7 +26,8 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). -include("emqx_persistent_message.hrl").
-define(EMQX_CONFIG, "sys_topics.sys_heartbeat_interval = 1s\n"). -define(EMQX_CONFIG, "sys_topics.sys_heartbeat_interval = 1s\n").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -186,7 +186,7 @@ prometheus_per_db(NodeOrAggr) ->
%% This function returns the data in the following format: %% This function returns the data in the following format:
%% ``` %% ```
%% #{emqx_ds_store_batch_time => %% #{emqx_ds_store_batch_time =>
%% [{[{db, emqx_persistent_message}], 42}], %% [{[{db, messages}], 42}],
%% ... %% ...
%% ''' %% '''
%% %%
@ -222,11 +222,11 @@ prometheus_per_db(NodeOrAggr, DB, Acc0) ->
%% This function returns the data in the following format: %% This function returns the data in the following format:
%% ``` %% ```
%% #{emqx_ds_egress_batches => %% #{emqx_ds_egress_batches =>
%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],99408}, %% [{[{db,messages},{shard,<<"1">>}],99408},
%% {[{db,emqx_persistent_message},{shard,<<"0">>}],99409}], %% {[{db,messages},{shard,<<"0">>}],99409}],
%% emqx_ds_egress_batches_retry => %% emqx_ds_egress_batches_retry =>
%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],0}, %% [{[{db,messages},{shard,<<"1">>}],0},
%% {[{db,emqx_persistent_message},{shard,<<"0">>}],0}], %% {[{db,messages},{shard,<<"0">>}],0}],
%% emqx_ds_egress_messages => %% emqx_ds_egress_messages =>
%% ... %% ...
%% } %% }

View File

@ -21,6 +21,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl"). -include_lib("emqx_utils/include/emqx_utils_api.hrl").
-include_lib("emqx/include/emqx_persistent_message.hrl").
-import(hoconsc, [mk/2, ref/1, enum/1, array/1]). -import(hoconsc, [mk/2, ref/1, enum/1, array/1]).
@ -222,7 +223,7 @@ fields(sites_shard) ->
atom(), atom(),
#{ #{
desc => <<"Durable storage ID">>, desc => <<"Durable storage ID">>,
example => 'emqx_persistent_message' example => ?PERSISTENT_MESSAGE_DB
} }
)}, )},
{id, {id,
@ -249,7 +250,7 @@ fields(db) ->
atom(), atom(),
#{ #{
desc => <<"Name of the durable storage">>, desc => <<"Name of the durable storage">>,
example => 'emqx_persistent_message' example => ?PERSISTENT_MESSAGE_DB
} }
)}, )},
{shards, {shards,
@ -403,7 +404,7 @@ param_storage_id() ->
required => true, required => true,
in => path, in => path,
desc => <<"Durable storage ID">>, desc => <<"Durable storage ID">>,
example => emqx_persistent_message example => ?PERSISTENT_MESSAGE_DB
}, },
{ds, mk(enum(dbs()), Info)}. {ds, mk(enum(dbs()), Info)}.
@ -416,7 +417,7 @@ example_site() ->
end. end.
dbs() -> dbs() ->
[emqx_persistent_message]. [?PERSISTENT_MESSAGE_DB].
shards_of_site(Site) -> shards_of_site(Site) ->
lists:flatmap( lists:flatmap(

View File

@ -59,7 +59,7 @@ t_get_storages(_) ->
Path = api_path(["ds", "storages"]), Path = api_path(["ds", "storages"]),
{ok, Response} = request_api(get, Path), {ok, Response} = request_api(get, Path),
?assertEqual( ?assertEqual(
[<<"emqx_persistent_message">>], [<<"messages">>],
emqx_utils_json:decode(Response, [return_maps]) emqx_utils_json:decode(Response, [return_maps])
). ).
@ -81,7 +81,7 @@ t_get_site(_) ->
<<"shards">> := <<"shards">> :=
[ [
#{ #{
<<"storage">> := <<"emqx_persistent_message">>, <<"storage">> := <<"messages">>,
<<"id">> := _, <<"id">> := _,
<<"status">> := <<"up">> <<"status">> := <<"up">>
} }
@ -99,12 +99,12 @@ t_get_db(_) ->
request_api(get, Path400) request_api(get, Path400)
), ),
%% Valid path: %% Valid path:
Path = api_path(["ds", "storages", "emqx_persistent_message"]), Path = api_path(["ds", "storages", "messages"]),
{ok, Response} = request_api(get, Path), {ok, Response} = request_api(get, Path),
ThisSite = emqx_ds_replication_layer_meta:this_site(), ThisSite = emqx_ds_replication_layer_meta:this_site(),
?assertMatch( ?assertMatch(
#{ #{
<<"name">> := <<"emqx_persistent_message">>, <<"name">> := <<"messages">>,
<<"shards">> := <<"shards">> :=
[ [
#{ #{
@ -132,7 +132,7 @@ t_get_replicas(_) ->
request_api(get, Path400) request_api(get, Path400)
), ),
%% Valid path: %% Valid path:
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]), Path = api_path(["ds", "storages", "messages", "replicas"]),
{ok, Response} = request_api(get, Path), {ok, Response} = request_api(get, Path),
ThisSite = emqx_ds_replication_layer_meta:this_site(), ThisSite = emqx_ds_replication_layer_meta:this_site(),
?assertEqual( ?assertEqual(
@ -141,7 +141,7 @@ t_get_replicas(_) ->
). ).
t_put_replicas(_) -> t_put_replicas(_) ->
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]), Path = api_path(["ds", "storages", "messages", "replicas"]),
%% Error cases: %% Error cases:
?assertMatch( ?assertMatch(
{ok, 400, #{<<"message">> := <<"Unknown sites: invalid_site">>}}, {ok, 400, #{<<"message">> := <<"Unknown sites: invalid_site">>}},
@ -154,13 +154,13 @@ t_put_replicas(_) ->
). ).
t_join(_) -> t_join(_) ->
Path400 = api_path(["ds", "storages", "emqx_persistent_message", "replicas", "unknown_site"]), Path400 = api_path(["ds", "storages", "messages", "replicas", "unknown_site"]),
?assertMatch( ?assertMatch(
{error, {_, 400, _}}, {error, {_, 400, _}},
parse_error(request_api(put, Path400)) parse_error(request_api(put, Path400))
), ),
ThisSite = emqx_ds_replication_layer_meta:this_site(), ThisSite = emqx_ds_replication_layer_meta:this_site(),
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]), Path = api_path(["ds", "storages", "messages", "replicas", ThisSite]),
?assertMatch( ?assertMatch(
{ok, "OK"}, {ok, "OK"},
request_api(put, Path) request_api(put, Path)
@ -168,7 +168,7 @@ t_join(_) ->
t_leave(_) -> t_leave(_) ->
ThisSite = emqx_ds_replication_layer_meta:this_site(), ThisSite = emqx_ds_replication_layer_meta:this_site(),
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]), Path = api_path(["ds", "storages", "messages", "replicas", ThisSite]),
?assertMatch( ?assertMatch(
{error, {_, 400, _}}, {error, {_, 400, _}},
request_api(delete, Path) request_api(delete, Path)
@ -176,7 +176,7 @@ t_leave(_) ->
t_leave_notfound(_) -> t_leave_notfound(_) ->
Site = "not_part_of_replica_set", Site = "not_part_of_replica_set",
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", Site]), Path = api_path(["ds", "storages", "messages", "replicas", Site]),
?assertMatch( ?assertMatch(
{error, {_, 404, _}}, {error, {_, 404, _}},
request_api(delete, Path) request_api(delete, Path)