%%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_retainer_schema). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -export([roots/0, fields/1, desc/1, namespace/0]). -define(DEFAULT_INDICES, [ [1, 2, 3], [1, 3], [2, 3], [3] ]). -define(INVALID_SPEC(_REASON_), throw({_REASON_, #{default => ?DEFAULT_INDICES}})). namespace() -> "retainer". roots() -> [ {"retainer", hoconsc:mk(hoconsc:ref(?MODULE, "retainer"), #{ converter => fun retainer_converter/2 })} ]. fields("retainer") -> [ {enable, sc(boolean(), enable, true)}, {msg_expiry_interval, sc( %% not used in a `receive ... after' block, just timestamp comparison emqx_schema:duration_ms(), msg_expiry_interval, <<"0s">> )}, {msg_clear_interval, sc( emqx_schema:timeout_duration_ms(), msg_clear_interval, <<"0s">> )}, {flow_control, sc( ?R_REF(flow_control), flow_control, #{}, ?IMPORTANCE_HIDDEN )}, {max_payload_size, sc( emqx_schema:bytesize(), max_payload_size, <<"1MB">> )}, {stop_publish_clear_msg, sc( boolean(), stop_publish_clear_msg, false )}, {delivery_rate, ?HOCON( emqx_limiter_schema:rate(), #{ required => false, desc => ?DESC(delivery_rate), example => <<"1000/s">>, aliases => [deliver_rate] } )}, {backend, backend_config()} ]; fields(mnesia_config) -> [ {type, sc(built_in_database, mnesia_config_type, built_in_database)}, {storage_type, sc( hoconsc:enum([ram, disc]), mnesia_config_storage_type, ram )}, {max_retained_messages, sc( non_neg_integer(), max_retained_messages, 0 )}, {index_specs, fun retainer_indices/1} ]; fields(flow_control) -> [ {batch_read_number, sc( non_neg_integer(), batch_read_number, 0 )}, {batch_deliver_number, sc( non_neg_integer(), batch_deliver_number, 0 )}, {batch_deliver_limiter, sc( ?R_REF(emqx_limiter_schema, internal), batch_deliver_limiter, undefined )} ]. desc("retainer") -> "Configuration related to handling `PUBLISH` packets with a `retain` flag set to 1."; desc(mnesia_config) -> "Configuration of the internal database storing retained messages."; desc(flow_control) -> "Retainer batching and rate limiting."; desc(_) -> undefined. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- %%sc(Type, DescId) -> %% hoconsc:mk(Type, #{desc => ?DESC(DescId)}). sc(Type, DescId, Default) -> sc(Type, DescId, Default, ?DEFAULT_IMPORTANCE). sc(Type, DescId, Default, Importance) -> hoconsc:mk(Type, #{default => Default, desc => ?DESC(DescId), importance => Importance}). backend_config() -> hoconsc:mk(hoconsc:ref(?MODULE, mnesia_config), #{desc => ?DESC(backend)}). retainer_indices(type) -> list(list(integer())); retainer_indices(desc) -> "Retainer index specifications: list of arrays of positive ascending integers. " "Each array specifies an index. Numbers in an index specification are 1-based " "word positions in topics. Words from specified positions will be used for indexing.
" "For example, it is good to have [2, 4] index to optimize " "+/X/+/Y/... topic wildcard subscriptions."; retainer_indices(example) -> [[2, 4], [1, 3]]; retainer_indices(default) -> ?DEFAULT_INDICES; retainer_indices(validator) -> fun check_index_specs/1; retainer_indices(_) -> undefined. check_index_specs([]) -> ok; check_index_specs(IndexSpecs) when is_list(IndexSpecs) -> lists:foreach(fun check_index_spec/1, IndexSpecs), check_duplicate(IndexSpecs); check_index_specs(_IndexSpecs) -> ?INVALID_SPEC(list_index_spec_limited). check_index_spec([]) -> ?INVALID_SPEC(non_empty_index_spec_limited); check_index_spec(IndexSpec) when is_list(IndexSpec) -> case lists:all(fun(Idx) -> is_integer(Idx) andalso Idx > 0 end, IndexSpec) of false -> ?INVALID_SPEC(pos_integer_index_limited); true -> check_duplicate(IndexSpec) end; check_index_spec(_IndexSpec) -> ?INVALID_SPEC(list_index_spec_limited). check_duplicate(List) -> case length(List) =:= length(lists:usort(List)) of false -> ?INVALID_SPEC(unique_index_spec_limited); true -> ok end. retainer_converter(#{<<"delivery_rate">> := <<"infinity">>} = Conf, _Opts) -> Conf#{ <<"flow_control">> => #{ <<"batch_read_number">> => 0, <<"batch_deliver_number">> => 0 } }; retainer_converter(#{<<"delivery_rate">> := RateStr} = Conf, _Opts) -> {ok, RateNum} = emqx_limiter_schema:to_rate(RateStr), RawRate = erlang:floor(RateNum * 1000 / emqx_limiter_schema:default_period()), Control = #{ <<"batch_read_number">> => RawRate, <<"batch_deliver_number">> => RawRate, %% Set the maximum delivery rate per session <<"batch_deliver_limiter">> => #{<<"client">> => #{<<"rate">> => RateStr}} }, Conf#{<<"flow_control">> => Control}; retainer_converter(#{<<"deliver_rate">> := Delivery} = Conf, Opts) -> Conf1 = maps:remove(<<"deliver_rate">>, Conf), retainer_converter(Conf1#{<<"delivery_rate">> => Delivery}, Opts); retainer_converter(Conf, _Opts) -> Conf.