Merge pull request #11159 from lafirest/fix/retainer_cfg
fix(retainer): Fix missing changes in configuration
This commit is contained in:
commit
ca52310f2b
|
@ -191,14 +191,14 @@ check_duplicate(List) ->
|
||||||
true -> ok
|
true -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retainer_converter(#{<<"deliver_rate">> := <<"infinity">>} = Conf, _Opts) ->
|
retainer_converter(#{<<"delivery_rate">> := <<"infinity">>} = Conf, _Opts) ->
|
||||||
Conf#{
|
Conf#{
|
||||||
<<"flow_control">> => #{
|
<<"flow_control">> => #{
|
||||||
<<"batch_read_number">> => 0,
|
<<"batch_read_number">> => 0,
|
||||||
<<"batch_deliver_number">> => 0
|
<<"batch_deliver_number">> => 0
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
retainer_converter(#{<<"deliver_rate">> := RateStr} = Conf, _Opts) ->
|
retainer_converter(#{<<"delivery_rate">> := RateStr} = Conf, _Opts) ->
|
||||||
{ok, RateNum} = emqx_limiter_schema:to_rate(RateStr),
|
{ok, RateNum} = emqx_limiter_schema:to_rate(RateStr),
|
||||||
RawRate = erlang:floor(RateNum * 1000 / emqx_limiter_schema:default_period()),
|
RawRate = erlang:floor(RateNum * 1000 / emqx_limiter_schema:default_period()),
|
||||||
Control = #{
|
Control = #{
|
||||||
|
@ -208,5 +208,8 @@ retainer_converter(#{<<"deliver_rate">> := RateStr} = Conf, _Opts) ->
|
||||||
<<"batch_deliver_limiter">> => #{<<"client">> => #{<<"rate">> => RateStr}}
|
<<"batch_deliver_limiter">> => #{<<"client">> => #{<<"rate">> => RateStr}}
|
||||||
},
|
},
|
||||||
Conf#{<<"flow_control">> => Control};
|
Conf#{<<"flow_control">> => Control};
|
||||||
|
retainer_converter(#{<<"deliver_rate">> := Delivery} = Conf, Opts) ->
|
||||||
|
Conf1 = maps:remove(<<"deliver_rate">>, Conf),
|
||||||
|
retainer_converter(Conf1#{<<"delivery_rate">> => Delivery}, Opts);
|
||||||
retainer_converter(Conf, _Opts) ->
|
retainer_converter(Conf, _Opts) ->
|
||||||
Conf.
|
Conf.
|
||||||
|
|
|
@ -758,6 +758,21 @@ t_compatibility_for_deliver_rate(_) ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Parser(R2)
|
Parser(R2)
|
||||||
|
),
|
||||||
|
|
||||||
|
DeliveryInf = <<"retainer.delivery_rate = \"infinity\"">>,
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"retainer">> :=
|
||||||
|
#{
|
||||||
|
<<"flow_control">> := #{
|
||||||
|
<<"batch_deliver_number">> := 0,
|
||||||
|
<<"batch_read_number">> := 0,
|
||||||
|
<<"batch_deliver_limiter">> := #{<<"rate">> := infinity}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Parser(DeliveryInf)
|
||||||
).
|
).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -15,7 +15,7 @@ retainer {
|
||||||
stop_publish_clear_msg = false
|
stop_publish_clear_msg = false
|
||||||
|
|
||||||
## Maximum retained messages delivery rate per session
|
## Maximum retained messages delivery rate per session
|
||||||
deliver_rate = "1000/s"
|
delivery_rate = "1000/s"
|
||||||
|
|
||||||
## Retained messages store backend
|
## Retained messages store backend
|
||||||
backend {
|
backend {
|
||||||
|
|
Loading…
Reference in New Issue