From 2ff78d7fc6a8c60246b0124589e5b971fcb45b5a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 24 Jun 2015 00:37:02 +0800 Subject: [PATCH] alarm --- include/emqttd.hrl | 17 ++++++++-- src/emqttd_alarm.erl | 75 ++++++++++++++++++++++++++++++++---------- src/emqttd_mqueue.erl | 45 +++++++++++++------------ src/emqttd_session.erl | 2 +- 4 files changed, 97 insertions(+), 42 deletions(-) diff --git a/include/emqttd.hrl b/include/emqttd.hrl index 34aa3b4bd..bf76cb389 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -111,14 +111,15 @@ -type mqtt_msgid() :: undefined | 1..16#ffff. -record(mqtt_message, { - topic :: binary(), %% The topic published to + topic :: binary(), %% Topic that the message is published to from :: binary() | atom(), %% ClientId of publisher qos = 0 :: 0 | 1 | 2, %% Message QoS retain = false :: boolean(), %% Retain flag dup = false :: boolean(), %% Dup flag sys = false :: boolean(), %% $SYS flag msgid :: mqtt_msgid(), %% Message ID - payload :: binary() %% Payload + payload :: binary(), %% Payload + timestamp :: erlang:timestamp() %% Timestamp }). -type mqtt_message() :: #mqtt_message{}. @@ -135,4 +136,16 @@ -type mqtt_plugin() :: #mqtt_plugin{}. +%%------------------------------------------------------------------------------ +%% MQTT Alarm +%%------------------------------------------------------------------------------ +-record(mqtt_alarm, { + id :: binary(), + severity :: warning | error | critical, + title :: binary(), + summary :: binary(), + timestamp :: erlang:timestamp() %% Timestamp +}). + +-type mqtt_alarm() :: #mqtt_alarm{}. diff --git a/src/emqttd_alarm.erl b/src/emqttd_alarm.erl index 73c432534..0c7061490 100644 --- a/src/emqttd_alarm.erl +++ b/src/emqttd_alarm.erl @@ -27,17 +27,18 @@ -module(emqttd_alarm). --export([start_link/0, set_alarm/1, clear_alarm/1, get_alarms/0, - add_alarm_handler/1, add_alarm_handler/2, - delete_alarm_handler/1]). +-include("emqttd.hrl"). + +-export([start_link/0, alarm_fun/0, get_alarms/0, + set_alarm/1, clear_alarm/1, + add_alarm_handler/1, add_alarm_handler/2, + delete_alarm_handler/1]). -export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2]). -define(SERVER, ?MODULE). --type alarm() :: {AlarmId :: any(), AlarmDescription :: string() | binary()}. - start_link() -> case gen_event:start_link({local, ?SERVER}) of {ok, Pid} -> @@ -47,12 +48,22 @@ start_link() -> Error end. --spec set_alarm(alarm()) -> ok. -set_alarm(Alarm) -> +alarm_fun() -> + alarm_fun(false). + +alarm_fun(Bool) -> + fun(alert, _Alarm) when Bool =:= true -> alarm_fun(true); + (alert, Alarm) when Bool =:= false -> set_alarm(Alarm), alarm_fun(true); + (clear, AlarmId) when Bool =:= true -> clear_alarm(AlarmId), alarm_fun(false); + (clear, _AlarmId) when Bool =:= false -> alarm_fun(false) + end. + +-spec set_alarm(mqtt_alarm()) -> ok. +set_alarm(Alarm) when is_record(Alarm, mqtt_alarm) -> gen_event:notify(?SERVER, {set_alarm, Alarm}). -spec clear_alarm(any()) -> ok. -clear_alarm(AlarmId) -> +clear_alarm(AlarmId) when is_binary(AlarmId) -> gen_event:notify(?SERVER, {clear_alarm, AlarmId}). get_alarms() -> @@ -70,25 +81,38 @@ delete_alarm_handler(Module) when is_atom(Module) -> %%----------------------------------------------------------------- %% Default Alarm handler %%----------------------------------------------------------------- - -init(_) -> {ok, []}. +init(_) -> + {ok, []}. -handle_event({set_alarm, Alarm}, Alarms)-> - %%TODO: publish to $SYS - {ok, [Alarm | Alarms]}; +handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId, + severity = Severity, + title = Title, + summary = Summary}}, Alarms)-> + Timestamp = os:timestamp(), + Json = mochijson2:encode([{id, AlarmId}, + {severity, Severity}, + {title, iolist_to_binary(Title)}, + {summary, iolist_to_binary(Summary)}, + {ts, emqttd_util:now_to_secs(Timestamp)}]), + emqttd_pubsub:publish(alarm_msg(alert, AlarmId, Json)), + {ok, [Alarm#mqtt_alarm{timestamp = Timestamp} | Alarms]}; handle_event({clear_alarm, AlarmId}, Alarms)-> - %TODO: publish to $SYS - {ok, lists:keydelete(AlarmId, 1, Alarms)}; + Json = mochijson2:encode([{id, AlarmId}, {ts, emqttd_util:now_to_secs()}]), + emqttd_pubsub:publish(alarm_msg(clear, AlarmId, Json)), + {ok, lists:keydelete(AlarmId, 2, Alarms)}; handle_event(_, Alarms)-> {ok, Alarms}. -handle_info(_, Alarms) -> {ok, Alarms}. +handle_info(_, Alarms) -> + {ok, Alarms}. -handle_call(get_alarms, Alarms) -> {ok, Alarms, Alarms}; +handle_call(get_alarms, Alarms) -> + {ok, Alarms, Alarms}; -handle_call(_Query, Alarms) -> {ok, {error, bad_query}, Alarms}. +handle_call(_Query, Alarms) -> + {ok, {error, bad_query}, Alarms}. terminate(swap, Alarms) -> {?MODULE, Alarms}; @@ -96,3 +120,18 @@ terminate(swap, Alarms) -> terminate(_, _) -> ok. +alarm_msg(Type, AlarmId, Json) -> + #mqtt_message{from = alarm, + qos = 1, + sys = true, + topic = topic(Type, AlarmId), + payload = iolist_to_binary(Json), + timestamp = os:timestamp()}. + +topic(alert, AlarmId) -> + emqttd_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); + +topic(clear, AlarmId) -> + emqttd_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>). + + diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index d30e755da..956a6dad5 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -55,7 +55,7 @@ -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). --export([new/2, name/1, +-export([new/3, name/1, is_empty/1, is_full/1, len/1, in/2, out/1]). @@ -64,18 +64,17 @@ -define(HIGH_WM, 0.6). -record(mqueue, {name, - q = queue:new(), %% pending queue - len = 0, %% current queue len - low_wm = ?LOW_WM, - high_wm = ?HIGH_WM, - max_len = ?MAX_LEN, - qos0 = false, - alarm = false}). + q = queue:new(), %% pending queue + len = 0, %% current queue len + low_wm = ?LOW_WM, + high_wm = ?HIGH_WM, + max_len = ?MAX_LEN, + qos0 = false, + alarm_fun}). -type mqueue() :: #mqueue{}. -type mqueue_option() :: {max_length, pos_integer()} %% Max queue length - | {inflight_window, pos_integer()} %% Inflight Window | {low_watermark, float()} %% Low watermark | {high_watermark, float()} %% High watermark | {queue_qos0, boolean()}. %% Queue Qos0 @@ -86,14 +85,15 @@ %% @doc New Queue. %% @end %%------------------------------------------------------------------------------ --spec new(binary(), list(mqueue_option())) -> mqueue(). -new(Name, Opts) -> +-spec new(binary(), list(mqueue_option()), fun()) -> mqueue(). +new(Name, Opts, AlarmFun) -> MaxLen = emqttd_opts:g(max_length, Opts, 1000), #mqueue{name = Name, max_len = MaxLen, low_wm = round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)), high_wm = round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)), - qos0 = emqttd_opts:g(queue_qos0, Opts, true)}. + qos0 = emqttd_opts:g(queue_qos0, Opts, true), + alarm_fun = AlarmFun}. name(#mqueue{name = Name}) -> Name. @@ -135,18 +135,21 @@ out(MQ = #mqueue{q = Q, len = Len}) -> {Result, Q2} = queue:out(Q), {Result, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}. -maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm = false}) - when Len >= HighWM -> - AlarmDescr = io_lib:format("len ~p > high_watermark ~p", [Len, HighWM]), - emqttd_alarm:set_alarm({{queue_high_watermark, Name}, AlarmDescr}), - MQ#mqueue{alarm = true}; +maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun}) + when Len > HighWM -> + Alarm = #mqtt_alarm{id = list_to_binary(["queue_high_watermark.", Name]), + severity = warning, + title = io_lib:format("Queue ~s high-water mark", [Name]), + summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])}, + MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)}; + maybe_set_alarm(MQ) -> MQ. -maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm = true}) - when Len =< LowWM -> - emqttd_alarm:clear_alarm({queue_high_watermark, Name}), - MQ#mqueue{alarm = false}; +maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun}) + when Len < LowWM -> + MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))}; + maybe_clear_alarm(MQ) -> MQ. diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 5cba1619d..e6bc4dce5 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -222,7 +222,7 @@ init([CleanSess, ClientId, ClientPid]) -> subscriptions = [], inflight_queue = [], max_inflight = emqttd_opts:g(max_inflight, SessEnv, 0), - message_queue = emqttd_mqueue:new(ClientId, QEnv), + message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), awaiting_rel = #{}, awaiting_ack = #{}, awaiting_comp = #{},