From eeeed35e2a46ecbeee958c551c34351d7af10835 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 19 Jul 2018 10:46:05 +0800 Subject: [PATCH] Remove the alarm_fun arg from emqx_mqueue:new/3 --- src/emqx_bridge.erl | 4 +--- src/emqx_mqueue.erl | 42 +++++++++--------------------------------- src/emqx_session.erl | 2 +- 3 files changed, 11 insertions(+), 37 deletions(-) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index ee2aa1535..517b32dcd 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -64,9 +64,7 @@ init([Pool, Id, Node, Topic, Options]) -> emqx_broker:subscribe(Topic, self(), [{share, Share}, {qos, ?QOS_0}]), State = parse_opts(Options, #state{node = Node, subtopic = Topic}), %%TODO: queue.... - MQueue = emqx_mqueue:new(qname(Node, Topic), - [{max_len, State#state.max_queue_len}], - emqx_alarm:alarm_fun()), + MQueue = emqx_mqueue:new(qname(Node, Topic), [{max_len, State#state.max_queue_len}]), {ok, State#state{pool = Pool, id = Id, mqueue = MQueue}}; false -> {stop, {cannot_connect_node, Node}} diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index db2f30cb6..43bb8654a 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -5,8 +5,7 @@ %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software +%%%% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and @@ -43,14 +42,13 @@ -module(emqx_mqueue). %% TODO: XYZ -%% -include("emqx.hrl"). -include("emqx_mqtt.hrl"). -import(proplists, [get_value/3]). --export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, +-export([new/2, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, dropped/1, stats/1]). -define(LOW_WM, 0.2). @@ -79,24 +77,22 @@ %% len of simple queue len = 0, max_len = 0, low_wm = ?LOW_WM, high_wm = ?HIGH_WM, - qos0 = false, dropped = 0, - alarm_fun}). + qos0 = false, dropped = 0}). -type(mqueue() :: #mqueue{}). -export_type([mqueue/0, priority/0, option/0]). -%% @doc New Queue. --spec(new(iolist(), list(option()), fun()) -> mqueue()). -new(Name, Opts, AlarmFun) -> +%% @doc New queue. +-spec(new(iolist(), list(option())) -> mqueue()). +new(Name, Opts) -> Type = get_value(type, Opts, simple), MaxLen = get_value(max_length, Opts, 0), init_q(#mqueue{type = Type, name = iolist_to_binary(Name), len = 0, max_len = MaxLen, low_wm = low_wm(MaxLen, Opts), high_wm = high_wm(MaxLen, Opts), - qos0 = get_value(store_qos0, Opts, false), - alarm_fun = AlarmFun}, Opts). + qos0 = get_value(store_qos0, Opts, false)}, Opts). init_q(MQ = #mqueue{type = simple}, _Opts) -> MQ#mqueue{q = queue:new()}; @@ -163,7 +159,7 @@ in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped {{value, _Old}, Q2} = queue:out(Q), MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1}; in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) -> - maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}); + MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, priorities = Priorities, @@ -199,28 +195,8 @@ out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> {R, MQ#mqueue{q = Q2, len = Len - 1}}; out(MQ = #mqueue{type = simple, q = Q, len = Len}) -> {R, Q2} = queue:out(Q), - {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}; + {R, MQ#mqueue{q = Q2, len = Len - 1}}; out(MQ = #mqueue{type = priority, q = Q}) -> {R, Q2} = ?PQUEUE:out(Q), {R, MQ#mqueue{q = Q2}}. -maybe_set_alarm(MQ = #mqueue{high_wm = undefined}) -> - MQ; -maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun}) - when Len > HighWM -> - Alarm = #alarm{id = iolist_to_binary(["queue_high_watermark.", Name]), - severity = warning, - title = io_lib:format("Queue ~s high-water mark", [Name]), - summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])}, - MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)}; -maybe_set_alarm(MQ) -> - MQ. - -maybe_clear_alarm(MQ = #mqueue{low_wm = undefined}) -> - MQ; -maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun}) - when Len < LowWM -> - MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))}; -maybe_clear_alarm(MQ) -> - MQ. - diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 607a3644f..02c7152b9 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -268,7 +268,7 @@ init(#{clean_start := CleanStart, MaxInflight = get_value(max_inflight, Env, 0), EnableStats = get_value(enable_stats, Env, false), IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false), - MQueue = ?MQueue:new(ClientId, QEnv, emqx_alarm:alarm_fun()), + MQueue = ?MQueue:new(ClientId, QEnv), State = #state{clean_start = CleanStart, binding = binding(ClientPid), client_id = ClientId,