Remove the alarm_fun arg from emqx_mqueue:new/3
This commit is contained in:
parent
4297033415
commit
eeeed35e2a
|
@ -64,9 +64,7 @@ init([Pool, Id, Node, Topic, Options]) ->
|
|||
emqx_broker:subscribe(Topic, self(), [{share, Share}, {qos, ?QOS_0}]),
|
||||
State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
|
||||
%%TODO: queue....
|
||||
MQueue = emqx_mqueue:new(qname(Node, Topic),
|
||||
[{max_len, State#state.max_queue_len}],
|
||||
emqx_alarm:alarm_fun()),
|
||||
MQueue = emqx_mqueue:new(qname(Node, Topic), [{max_len, State#state.max_queue_len}]),
|
||||
{ok, State#state{pool = Pool, id = Id, mqueue = MQueue}};
|
||||
false ->
|
||||
{stop, {cannot_connect_node, Node}}
|
||||
|
|
|
@ -5,8 +5,7 @@
|
|||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%%%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
|
@ -43,14 +42,13 @@
|
|||
-module(emqx_mqueue).
|
||||
|
||||
%% TODO: XYZ
|
||||
%%
|
||||
-include("emqx.hrl").
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
-import(proplists, [get_value/3]).
|
||||
|
||||
-export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1,
|
||||
-export([new/2, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1,
|
||||
dropped/1, stats/1]).
|
||||
|
||||
-define(LOW_WM, 0.2).
|
||||
|
@ -79,24 +77,22 @@
|
|||
%% len of simple queue
|
||||
len = 0, max_len = 0,
|
||||
low_wm = ?LOW_WM, high_wm = ?HIGH_WM,
|
||||
qos0 = false, dropped = 0,
|
||||
alarm_fun}).
|
||||
qos0 = false, dropped = 0}).
|
||||
|
||||
-type(mqueue() :: #mqueue{}).
|
||||
|
||||
-export_type([mqueue/0, priority/0, option/0]).
|
||||
|
||||
%% @doc New Queue.
|
||||
-spec(new(iolist(), list(option()), fun()) -> mqueue()).
|
||||
new(Name, Opts, AlarmFun) ->
|
||||
%% @doc New queue.
|
||||
-spec(new(iolist(), list(option())) -> mqueue()).
|
||||
new(Name, Opts) ->
|
||||
Type = get_value(type, Opts, simple),
|
||||
MaxLen = get_value(max_length, Opts, 0),
|
||||
init_q(#mqueue{type = Type, name = iolist_to_binary(Name),
|
||||
len = 0, max_len = MaxLen,
|
||||
low_wm = low_wm(MaxLen, Opts),
|
||||
high_wm = high_wm(MaxLen, Opts),
|
||||
qos0 = get_value(store_qos0, Opts, false),
|
||||
alarm_fun = AlarmFun}, Opts).
|
||||
qos0 = get_value(store_qos0, Opts, false)}, Opts).
|
||||
|
||||
init_q(MQ = #mqueue{type = simple}, _Opts) ->
|
||||
MQ#mqueue{q = queue:new()};
|
||||
|
@ -163,7 +159,7 @@ in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped
|
|||
{{value, _Old}, Q2} = queue:out(Q),
|
||||
MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1};
|
||||
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) ->
|
||||
maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1});
|
||||
MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
|
||||
|
||||
in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
|
||||
priorities = Priorities,
|
||||
|
@ -199,28 +195,8 @@ out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
|
|||
{R, MQ#mqueue{q = Q2, len = Len - 1}};
|
||||
out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
|
||||
{R, Q2} = queue:out(Q),
|
||||
{R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})};
|
||||
{R, MQ#mqueue{q = Q2, len = Len - 1}};
|
||||
out(MQ = #mqueue{type = priority, q = Q}) ->
|
||||
{R, Q2} = ?PQUEUE:out(Q),
|
||||
{R, MQ#mqueue{q = Q2}}.
|
||||
|
||||
maybe_set_alarm(MQ = #mqueue{high_wm = undefined}) ->
|
||||
MQ;
|
||||
maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun})
|
||||
when Len > HighWM ->
|
||||
Alarm = #alarm{id = iolist_to_binary(["queue_high_watermark.", Name]),
|
||||
severity = warning,
|
||||
title = io_lib:format("Queue ~s high-water mark", [Name]),
|
||||
summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])},
|
||||
MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)};
|
||||
maybe_set_alarm(MQ) ->
|
||||
MQ.
|
||||
|
||||
maybe_clear_alarm(MQ = #mqueue{low_wm = undefined}) ->
|
||||
MQ;
|
||||
maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun})
|
||||
when Len < LowWM ->
|
||||
MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))};
|
||||
maybe_clear_alarm(MQ) ->
|
||||
MQ.
|
||||
|
||||
|
|
|
@ -268,7 +268,7 @@ init(#{clean_start := CleanStart,
|
|||
MaxInflight = get_value(max_inflight, Env, 0),
|
||||
EnableStats = get_value(enable_stats, Env, false),
|
||||
IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false),
|
||||
MQueue = ?MQueue:new(ClientId, QEnv, emqx_alarm:alarm_fun()),
|
||||
MQueue = ?MQueue:new(ClientId, QEnv),
|
||||
State = #state{clean_start = CleanStart,
|
||||
binding = binding(ClientPid),
|
||||
client_id = ClientId,
|
||||
|
|
Loading…
Reference in New Issue