diff --git a/src/emqttd_alarm.erl b/src/emqttd_alarm.erl index 0c7061490..f3a25ff68 100644 --- a/src/emqttd_alarm.erl +++ b/src/emqttd_alarm.erl @@ -121,12 +121,10 @@ terminate(_, _) -> ok. alarm_msg(Type, AlarmId, Json) -> - #mqtt_message{from = alarm, - qos = 1, - sys = true, - topic = topic(Type, AlarmId), - payload = iolist_to_binary(Json), - timestamp = os:timestamp()}. + Msg = emqttd_message:make(alarm, + topic(Type, AlarmId), + iolist_to_binary(Json)), + emqttd_message:set_flag(sys, Msg). topic(alert, AlarmId) -> emqttd_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index 2129a6bdb..38183a573 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -81,7 +81,7 @@ init([Node, SubTopic, Options]) -> true -> true = erlang:monitor_node(Node, true), State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}), - emqttd_pubsub:subscribe({SubTopic, ?QOS_0}), + emqttd_pubsub:subscribe({SubTopic, State#state.qos}), {ok, State}; false -> {stop, {cannot_connect, Node}} @@ -107,7 +107,7 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({dispatch, Msg}, State = #state{node = Node, status = down}) -> - lager:warning("Bridge Dropped Msg for ~p Down:~n~p", [Node, Msg]), + lager:error("Bridge Dropped Msg for ~p Down: ~s", [Node, emqttd_message:format(Msg)]), {noreply, State}; handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) -> @@ -159,14 +159,7 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -%TODO: qos is not right... -transform(Msg = #mqtt_message{topic = Topic}, #state{qos = Qos, - topic_prefix = Prefix, +transform(Msg = #mqtt_message{topic = Topic}, #state{topic_prefix = Prefix, topic_suffix = Suffix}) -> - Msg1 = - if - Qos =:= undefined -> Msg; - true -> Msg#mqtt_message{qos = Qos} - end, - Msg1#mqtt_message{topic = <>}. + Msg#mqtt_message{topic = <>}. diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index e4deb6414..730e70f16 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -186,7 +186,7 @@ foldl_hooks(Hook, Args, Acc0) -> case ets:lookup(?BROKER_TAB, {hook, Hook}) of [{_, Hooks}] -> lists:foldl(fun({_Name, {M, F, A}}, Acc) -> - apply(M, F, [Acc, Args++A]) + apply(M, F, lists:append([Args, [Acc], A])) end, Acc0, Hooks); [] -> Acc0 @@ -286,23 +286,15 @@ create_topic(Topic) -> retain(brokers) -> Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")), - publish(#mqtt_message{from = broker, - retain = true, - topic = <<"$SYS/brokers">>, - payload = Payload}). + Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload), + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). retain(Topic, Payload) when is_binary(Payload) -> - publish(#mqtt_message{from = broker, - retain = true, - topic = emqttd_topic:systop(Topic), - payload = Payload}). + Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), + emqttd_pubsub:publish(emqttd_message:set_flag(retain, Msg)). publish(Topic, Payload) when is_binary(Payload) -> - publish( #mqtt_message{from = broker, - topic = emqttd_topic:systop(Topic), - payload = Payload}). - -publish(Msg) -> + Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), emqttd_pubsub:publish(Msg). uptime(#state{started_at = Ts}) -> diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 8b7afe54e..3e66670e6 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -49,14 +49,11 @@ handle_request('POST', "/mqtt/publish", Req) -> Qos = int(get_value("qos", Params, "0")), Retain = bool(get_value("retain", Params, "0")), Topic = list_to_binary(get_value("topic", Params)), - Message = list_to_binary(get_value("message", Params)), + Payload = list_to_binary(get_value("message", Params)), case {validate(qos, Qos), validate(topic, Topic)} of {true, true} -> - emqttd_pubsub:publish(#mqtt_message{from = http, - qos = Qos, - retain = Retain, - topic = Topic, - payload = Message}), + Msg = emqttd_message:make(http, Qos, Topic, Payload), + emqttd_pubsub:publish(Msg#mqtt_message{retain = Retain}), Req:ok({"text/plan", <<"ok\n">>}); {false, _} -> Req:respond({400, [], <<"Bad QoS">>}); diff --git a/src/emqttd_message.erl b/src/emqttd_message.erl index d96e5d922..4d3eaea83 100644 --- a/src/emqttd_message.erl +++ b/src/emqttd_message.erl @@ -32,12 +32,39 @@ -include("emqttd_protocol.hrl"). --export([from_packet/1, from_packet/2, to_packet/1]). +-export([make/3, make/4, from_packet/1, from_packet/2, to_packet/1]). -export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]). -export([format/1]). +%%------------------------------------------------------------------------------ +%% @doc Make a message +%% @end +%%------------------------------------------------------------------------------ +-spec make(From, Topic, Payload) -> mqtt_message() when + From :: atom() | binary(), + Topic :: binary(), + Payload :: binary(). +make(From, Topic, Payload) -> + #mqtt_message{topic = Topic, + from = From, + payload = Payload, + timestamp = os:timestamp()}. + +-spec make(From, Qos, Topic, Payload) -> mqtt_message() when + From :: atom() | binary(), + Qos :: mqtt_qos(), + Topic :: binary(), + Payload :: binary(). +make(From, Qos, Topic, Payload) -> + #mqtt_message{msgid = msgid(Qos), + topic = Topic, + from = From, + qos = Qos, + payload = Payload, + timestamp = os:timestamp()}. + %%------------------------------------------------------------------------------ %% @doc Message from Packet %% @end @@ -50,12 +77,14 @@ from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, variable = #mqtt_packet_publish{topic_name = Topic, packet_id = PacketId}, payload = Payload}) -> - #mqtt_message{msgid = PacketId, + #mqtt_message{msgid = msgid(Qos), + pktid = PacketId, qos = Qos, retain = Retain, dup = Dup, topic = Topic, - payload = Payload}; + payload = Payload, + timestamp = os:timestamp()}; from_packet(#mqtt_packet_connect{will_flag = false}) -> undefined; @@ -64,38 +93,44 @@ from_packet(#mqtt_packet_connect{will_retain = Retain, will_qos = Qos, will_topic = Topic, will_msg = Msg}) -> - #mqtt_message{retain = Retain, - qos = Qos, - topic = Topic, - dup = false, - payload = Msg}. + #mqtt_message{msgid = msgid(Qos), + topic = Topic, + retain = Retain, + qos = Qos, + dup = false, + payload = Msg, + timestamp = os:timestamp()}. from_packet(ClientId, Packet) -> Msg = from_packet(Packet), Msg#mqtt_message{from = ClientId}. +msgid(?QOS_0) -> + undefined; +msgid(_Qos) -> + emqttd_guid:gen(). + %%------------------------------------------------------------------------------ %% @doc Message to packet %% @end %%------------------------------------------------------------------------------ -spec to_packet(mqtt_message()) -> mqtt_packet(). -to_packet(#mqtt_message{msgid = MsgId, +to_packet(#mqtt_message{pktid = PkgId, qos = Qos, retain = Retain, dup = Dup, topic = Topic, payload = Payload}) -> - PacketId = if - Qos =:= ?QOS_0 -> undefined; - true -> MsgId - end, - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = Qos, retain = Retain, dup = Dup}, variable = #mqtt_packet_publish{topic_name = Topic, - packet_id = PacketId}, + packet_id = if + Qos =:= ?QOS_0 -> undefined; + true -> PkgId + end + }, payload = Payload}. %%------------------------------------------------------------------------------ @@ -109,6 +144,8 @@ set_flag(Msg) -> -spec set_flag(atom(), mqtt_message()) -> mqtt_message(). set_flag(dup, Msg = #mqtt_message{dup = false}) -> Msg#mqtt_message{dup = true}; +set_flag(sys, Msg = #mqtt_message{sys = false}) -> + Msg#mqtt_message{sys = true}; set_flag(retain, Msg = #mqtt_message{retain = false}) -> Msg#mqtt_message{retain = true}; set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. @@ -133,7 +170,7 @@ unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. %% @doc Format MQTT Message %% @end %%------------------------------------------------------------------------------ -format(#mqtt_message{msgid=MsgId, qos=Qos, retain=Retain, dup=Dup, topic=Topic}) -> - io_lib:format("Message(MsgId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)", - [MsgId, Qos, Retain, Dup, Topic]). +format(#mqtt_message{msgid=MsgId, pktid = PktId, qos=Qos, retain=Retain, dup=Dup, topic=Topic}) -> + io_lib:format("Message(MsgId=~p, PktId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)", + [MsgId, PktId, Qos, Retain, Dup, Topic]). diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index f0d45ec02..863d67676 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -77,7 +77,7 @@ -define(SYSTOP_MESSAGES, [ {counter, 'messages/received'}, % Messages received {counter, 'messages/sent'}, % Messages sent - {gauge, 'messages/retained/count'},% Messagea retained + {gauge, 'messages/retained'}, % Messagea retained {gauge, 'messages/stored/count'}, % Messages stored {counter, 'messages/dropped'} % Messages dropped ]). @@ -222,9 +222,9 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= publish(Metric, Val) -> - emqttd_pubsub:publish(#mqtt_message{topic = metric_topic(Metric), - from = metrics, - payload = emqttd_util:integer_to_binary(Val)}). + Payload = emqttd_util:integer_to_binary(Val), + Msg = emqttd_message:make(metrics, metric_topic(Metric), Payload), + emqttd_pubsub:publish(Msg). create_metric({gauge, Name}) -> ets:insert(?METRIC_TAB, {{Name, 0}, 0}); diff --git a/src/emqttd_mod_autosub.erl b/src/emqttd_mod_autosub.erl index 45461be88..86d0b3e2e 100644 --- a/src/emqttd_mod_autosub.erl +++ b/src/emqttd_mod_autosub.erl @@ -41,7 +41,7 @@ load(Opts) -> Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], - emqttd_broker:hook(client_connected, {?MODULE, client_connected}, + emqttd_broker:hook('client.connected', {?MODULE, client_connected}, {?MODULE, client_connected, [Topics]}), {ok, #state{topics = Topics}}. @@ -53,6 +53,5 @@ client_connected(_ConnAck, _Client, _Topics) -> ignore. unload(_Opts) -> - emqttd_broker:unhook(client_connected, {?MODULE, client_connected}). - + emqttd_broker:unhook('client.connected', {?MODULE, client_connected}). diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index 017a8418f..63edf024b 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -35,8 +35,8 @@ -export([client_connected/3, client_disconnected/3]). load(Opts) -> - emqttd_broker:hook(client_connected, {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}), - emqttd_broker:hook(client_disconnected, {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}), + emqttd_broker:hook('client.connected', {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}), + emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}), {ok, Opts}. client_connected(ConnAck, #mqtt_client{client_id = ClientId, @@ -55,24 +55,25 @@ client_connected(ConnAck, #mqtt_client{client_id = ClientId, {protocol, ProtoVer}, {connack, ConnAck}, {ts, emqttd_util:now_to_secs()}]), - Message = #mqtt_message{from = presence, - qos = proplists:get_value(qos, Opts, 0), - topic = topic(connected, ClientId), - payload = iolist_to_binary(Json)}, - emqttd_pubsub:publish(Message). + Msg = emqttd_message:make(presence, + proplists:get_value(qos, Opts, 0), + topic(connected, ClientId), + iolist_to_binary(Json)), + emqttd_pubsub:publish(Msg). client_disconnected(Reason, ClientId, Opts) -> Json = mochijson2:encode([{clientid, ClientId}, {reason, reason(Reason)}, {ts, emqttd_util:now_to_secs()}]), - emqttd_pubsub:publish(#mqtt_message{from = presence, - qos = proplists:get_value(qos, Opts, 0), - topic = topic(disconnected, ClientId), - payload = iolist_to_binary(Json)}). + Msg = emqttd_message:make(presence, + proplists:get_value(qos, Opts, 0), + topic(disconnected, ClientId), + iolist_to_binary(Json)), + emqttd_pubsub:publish(Msg). unload(_Opts) -> - emqttd_broker:unhook(client_connected, {?MODULE, client_connected}), - emqttd_broker:unhook(client_disconnected, {?MODULE, client_disconnected}). + emqttd_broker:unhook('client.connected', {?MODULE, client_connected}), + emqttd_broker:unhook('client.disconnected', {?MODULE, client_disconnected}). topic(connected, ClientId) -> emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])); diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index eaaf287ae..4f29b51d1 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -35,7 +35,7 @@ -export([load/1, reload/1, unload/1]). --export([rewrite/2]). +-export([rewrite/3, rewrite/4]). %%%============================================================================= %%% API @@ -45,22 +45,22 @@ load(Opts) -> File = proplists:get_value(file, Opts), {ok, Terms} = file:consult(File), Sections = compile(Terms), - emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe}, + emqttd_broker:hook('client.subscribe', {?MODULE, rewrite_subscribe}, {?MODULE, rewrite, [subscribe, Sections]}), - emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}, + emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}, {?MODULE, rewrite, [unsubscribe, Sections]}), - emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish}, + emqttd_broker:hook('client.publish', {?MODULE, rewrite_publish}, {?MODULE, rewrite, [publish, Sections]}). -rewrite(TopicTable, [subscribe, Sections]) -> +rewrite(_ClientId, TopicTable, subscribe, Sections) -> lager:info("rewrite subscribe: ~p", [TopicTable]), [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]; -rewrite(Topics, [unsubscribe, Sections]) -> +rewrite(_ClientId, Topics, unsubscribe, Sections) -> lager:info("rewrite unsubscribe: ~p", [Topics]), - [match_topic(Topic, Sections) || Topic <- Topics]; + [match_topic(Topic, Sections) || Topic <- Topics]. -rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) -> +rewrite(Message=#mqtt_message{topic = Topic}, publish, Sections) -> %%TODO: this will not work if the client is always online. RewriteTopic = case get({rewrite, Topic}) of @@ -83,9 +83,9 @@ reload(File) -> end. unload(_) -> - emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}), - emqttd_broker:unhook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}), - emqttd_broker:unhook(client_publish, {?MODULE, rewrite_publish}). + emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}), + emqttd_broker:unhook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), + emqttd_broker:unhook('client.publish', {?MODULE, rewrite_publish}). %%%============================================================================= %%% Internal functions @@ -116,7 +116,6 @@ match_rule(Topic, []) -> match_rule(Topic, [{rewrite, MP, Dest} | Rules]) -> case re:run(Topic, MP, [{capture, all_but_first, list}]) of {match, Captured} -> - %%TODO: stupid??? how to replace $1, $2? Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured), iolist_to_binary(lists:foldl( fun({Var, Val}, Acc) -> diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 1ade6405e..28ae04d56 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -207,7 +207,7 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), {ok, State}; false -> - TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable), + TopicTable1 = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable), %%TODO: GrantedQos should be renamed. {ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1), send(?SUBACK_PACKET(PacketId, GrantedQos), State) @@ -221,8 +221,9 @@ handle({subscribe, TopicTable}, State = #proto_state{session = Session}) -> handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); -handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> - Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics), +handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{client_id = ClientId, + session = Session}) -> + Topics1 = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics), ok = emqttd_session:unsubscribe(Session, Topics1), send(?UNSUBACK_PACKET(PacketId), State); diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 3699bfc9f..83f321fcd 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -157,19 +157,18 @@ cast(Msg) -> %% @end %%------------------------------------------------------------------------------ -spec publish(Msg :: mqtt_message()) -> ok. -publish(#mqtt_message{topic=Topic, from = From} = Msg) -> +publish(#mqtt_message{from = From} = Msg) -> trace(publish, From, Msg), - %%TODO:call hooks here... - %%Msg1 = emqttd_broker:foldl_hooks(client_publish, [], Msg), + Msg1 = #mqtt_message{topic = Topic} = emqttd_broker:foldl_hooks('client.publish', [], Msg), %% Retain message first. Don't create retained topic. - case emqttd_msg_store:retain(Msg) of + case emqttd_retained:retain(Msg1) of ok -> %TODO: why unset 'retain' flag? - publish(Topic, emqttd_message:unset_flag(Msg)); + publish(Topic, emqttd_message:unset_flag(Msg1)); ignore -> - publish(Topic, Msg) + publish(Topic, Msg1) end. publish(<<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) -> diff --git a/src/emqttd_retained.erl b/src/emqttd_retained.erl index fb2df9171..5c1b03610 100644 --- a/src/emqttd_retained.erl +++ b/src/emqttd_retained.erl @@ -24,7 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_msg_store). +-module(emqttd_retained). -author("Feng Lee "). @@ -37,21 +37,23 @@ -copy_mnesia({mnesia, [copy]}). %% API Function Exports --export([retain/1, redeliver/2]). +-export([retain/1, dispatch/2]). + +-record(mqtt_retained, {topic, message}). %%%============================================================================= %%% Mnesia callbacks %%%============================================================================= mnesia(boot) -> - ok = emqttd_mnesia:create_table(message, [ + ok = emqttd_mnesia:create_table(retained, [ {type, ordered_set}, {ram_copies, [node()]}, - {record_name, mqtt_message}, - {attributes, record_info(fields, mqtt_message)}]); + {record_name, mqtt_retained}, + {attributes, record_info(fields, mqtt_retained)}]); mnesia(copy) -> - ok = emqttd_mnesia:copy_table(message). + ok = emqttd_mnesia:copy_table(retained). %%%============================================================================= %%% API @@ -66,7 +68,7 @@ retain(#mqtt_message{retain = false}) -> ignore; %% RETAIN flag set to 1 and payload containing zero bytes retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> - mnesia:async_dirty(fun mnesia:delete/1, [{message, Topic}]); + mnesia:async_dirty(fun mnesia:delete/1, [{retained, Topic}]); retain(Msg = #mqtt_message{topic = Topic, retain = true, @@ -74,10 +76,10 @@ retain(Msg = #mqtt_message{topic = Topic, TabSize = mnesia:table_info(message, size), case {TabSize < limit(table), size(Payload) < limit(payload)} of {true, true} -> + Retained = #mqtt_retained{topic = Topic, message = Msg}, lager:debug("Retained ~s", [emqttd_message:format(Msg)]), - mnesia:async_dirty(fun mnesia:write/3, [message, Msg, write]), - emqttd_metrics:set('messages/retained/count', - mnesia:table_info(message, size)); + mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]), + emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size)); {false, _}-> lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]); {_, false}-> @@ -99,31 +101,25 @@ env() -> end. %%%----------------------------------------------------------------------------- -%% @doc Redeliver retained messages to subscribed client +%% @doc Deliver retained messages to subscribed client %% @end %%%----------------------------------------------------------------------------- --spec redeliver(Topic, CPid) -> any() when +-spec dispatch(Topic, CPid) -> any() when Topic :: binary(), CPid :: pid(). -redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) -> +dispatch(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) -> + Msgs = case emqttd_topic:wildcard(Topic) of false -> - dispatch(CPid, mnesia:dirty_read(message, Topic)); + [Msg || #mqtt_retained{message = Msg} <- mnesia:dirty_read(retained, Topic)]; true -> - Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) -> + Fun = fun(#mqtt_retained{topic = Name, message = Msg}, Acc) -> case emqttd_topic:match(Name, Topic) of true -> [Msg|Acc]; false -> Acc end end, - Msgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], message]), - dispatch(CPid, lists:reverse(Msgs)) - end. - -dispatch(_CPid, []) -> - ignore; -dispatch(CPid, Msgs) when is_list(Msgs) -> - [CPid ! {dispatch, Msg} || Msg <- Msgs]; -dispatch(CPid, Msg) when is_record(Msg, mqtt_message) -> - CPid ! {dispatch, Msg}. + mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained]) + end, + [CPid ! {dispatch, Msg} || Msg <- Msgs]. diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 3b2fb4382..da2bfdf72 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -78,8 +78,8 @@ %% Client Pid linked with session client_pid :: pid(), - %% Last message id of the session - message_id = 1, + %% Last packet id of the session + packet_id = 1, %% Client’s subscriptions. subscriptions :: list(), @@ -182,21 +182,21 @@ publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) -> %% @doc PubAck message %% @end %%------------------------------------------------------------------------------ --spec puback(pid(), mqtt_msgid()) -> ok. -puback(Session, MsgId) -> - gen_server:cast(Session, {puback, MsgId}). +-spec puback(pid(), mqtt_packet_id()) -> ok. +puback(Session, PktId) -> + gen_server:cast(Session, {puback, PktId}). --spec pubrec(pid(), mqtt_msgid()) -> ok. -pubrec(Session, MsgId) -> - gen_server:cast(Session, {pubrec, MsgId}). +-spec pubrec(pid(), mqtt_packet_id()) -> ok. +pubrec(Session, PktId) -> + gen_server:cast(Session, {pubrec, PktId}). --spec pubrel(pid(), mqtt_msgid()) -> ok. -pubrel(Session, MsgId) -> - gen_server:cast(Session, {pubrel, MsgId}). +-spec pubrel(pid(), mqtt_packet_id()) -> ok. +pubrel(Session, PktId) -> + gen_server:cast(Session, {pubrel, PktId}). --spec pubcomp(pid(), mqtt_msgid()) -> ok. -pubcomp(Session, MsgId) -> - gen_server:cast(Session, {pubcomp, MsgId}). +-spec pubcomp(pid(), mqtt_packet_id()) -> ok. +pubcomp(Session, PktId) -> + gen_server:cast(Session, {pubcomp, PktId}). %%------------------------------------------------------------------------------ %% @doc Unsubscribe Topics @@ -258,7 +258,7 @@ handle_call({subscribe, Topics}, _From, Session = #session{client_id = ClientId, %% : 3.8.4 %% Where the Topic Filter is not identical to any existing Subscription’s filter, %% a new Subscription is created and all matching retained messages are sent. - emqttd_msg_store:redeliver(Topic, self()), + emqttd_retained:dispatch(Topic, self()), [{Topic, Qos} | Acc] end end, Subscriptions, Topics), @@ -284,14 +284,14 @@ handle_call({unsubscribe, Topics}, _From, Session = #session{client_id = ClientI {reply, ok, Session#session{subscriptions = Subscriptions1}}; -handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From, +handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, Session = #session{client_id = ClientId, awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> case check_awaiting_rel(Session) of true -> - TRef = timer(Timeout, {timeout, awaiting_rel, MsgId}), - AwaitingRel1 = maps:put(MsgId, {Msg, TRef}, AwaitingRel), + TRef = timer(Timeout, {timeout, awaiting_rel, PktId}), + AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; false -> lager:critical([{client, ClientId}], "Session ~s dropped Qos2 message " @@ -326,7 +326,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> true = link(ClientPid), %% Redeliver PUBREL - [ClientPid ! {redeliver, {?PUBREL, MsgId}} || MsgId <- maps:keys(AwaitingComp)], + [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)], %% Clear awaiting_ack timers [cancel_timer(TRef) || {_, TRef} <- maps:values(AwaitingAck)], @@ -349,54 +349,54 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> {noreply, dequeue(Session2), hibernate}; %% PUBRAC -handle_cast({puback, MsgId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) -> - case maps:find(MsgId, Awaiting) of +handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) -> + case maps:find(PktId, Awaiting) of {ok, {_, TRef}} -> cancel_timer(TRef), - Session1 = acked(MsgId, Session), + Session1 = acked(PktId, Session), {noreply, dequeue(Session1)}; error -> - lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, MsgId]), + lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]), {noreply, Session} end; %% PUBREC -handle_cast({pubrec, MsgId}, Session = #session{client_id = ClientId, +handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, await_rel_timeout = Timeout}) -> - case maps:find(MsgId, AwaitingAck) of + case maps:find(PktId, AwaitingAck) of {ok, {_, TRef}} -> cancel_timer(TRef), - TRef1 = timer(Timeout, {timeout, awaiting_comp, MsgId}), - Session1 = acked(MsgId, Session#session{awaiting_comp = maps:put(MsgId, TRef1, AwaitingComp)}), + TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}), + Session1 = acked(PktId, Session#session{awaiting_comp = maps:put(PktId, TRef1, AwaitingComp)}), {noreply, dequeue(Session1)}; error -> - lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, MsgId]), + lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]), {noreply, Session} end; %% PUBREL -handle_cast({pubrel, MsgId}, Session = #session{client_id = ClientId, +handle_cast({pubrel, PktId}, Session = #session{client_id = ClientId, awaiting_rel = AwaitingRel}) -> - case maps:find(MsgId, AwaitingRel) of + case maps:find(PktId, AwaitingRel) of {ok, {Msg, TRef}} -> cancel_timer(TRef), emqttd_pubsub:publish(Msg), - {noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}}; + {noreply, Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}}; error -> - lager:error("Session ~s cannot find PUBREL: msgid=~p!", [ClientId, MsgId]), + lager:error("Session ~s cannot find PUBREL: pktid=~p!", [ClientId, PktId]), {noreply, Session} end; %% PUBCOMP -handle_cast({pubcomp, MsgId}, Session = #session{client_id = ClientId, awaiting_comp = AwaitingComp}) -> - case maps:find(MsgId, AwaitingComp) of +handle_cast({pubcomp, PktId}, Session = #session{client_id = ClientId, awaiting_comp = AwaitingComp}) -> + case maps:find(PktId, AwaitingComp) of {ok, TRef} -> cancel_timer(TRef), - {noreply, Session#session{awaiting_comp = maps:remove(MsgId, AwaitingComp)}}; + {noreply, Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}}; error -> - lager:error("Session ~s cannot find PUBCOMP: MsgId=~p", [ClientId, MsgId]), + lager:error("Session ~s cannot find PUBCOMP: PktId=~p", [ClientId, PktId]), {noreply, Session} end; @@ -428,51 +428,51 @@ handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}} end; -handle_info({timeout, awaiting_ack, MsgId}, Session = #session{client_pid = undefined, +handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, awaiting_ack = AwaitingAck}) -> %% just remove awaiting - {noreply, Session#session{awaiting_ack = maps:remove(MsgId, AwaitingAck)}}; + {noreply, Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}}; -handle_info({timeout, awaiting_ack, MsgId}, Session = #session{client_id = ClientId, +handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId, inflight_queue = InflightQ, awaiting_ack = AwaitingAck}) -> - case maps:find(MsgId, AwaitingAck) of + case maps:find(PktId, AwaitingAck) of {ok, {{0, _Timeout}, _TRef}} -> - Session1 = Session#session{inflight_queue = lists:keydelete(MsgId, 1, InflightQ), - awaiting_ack = maps:remove(MsgId, AwaitingAck)}, + Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), + awaiting_ack = maps:remove(PktId, AwaitingAck)}, {noreply, dequeue(Session1)}; {ok, {{Retries, Timeout}, _TRef}} -> - TRef = timer(Timeout, {timeout, awaiting_ack, MsgId}), - AwaitingAck1 = maps:put(MsgId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), + TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), + AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), {noreply, Session#session{awaiting_ack = AwaitingAck1}}; error -> lager:error([{client, ClientId}], "Session ~s " - "cannot find Awaiting Ack:~p", [ClientId, MsgId]), + "cannot find Awaiting Ack:~p", [ClientId, PktId]), {noreply, Session} end; -handle_info({timeout, awaiting_rel, MsgId}, Session = #session{client_id = ClientId, +handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId, awaiting_rel = AwaitingRel}) -> - case maps:find(MsgId, AwaitingRel) of + case maps:find(PktId, AwaitingRel) of {ok, {Msg, _TRef}} -> lager:error([{client, ClientId}], "Session ~s AwaitingRel Timout!~n" "Drop Message:~p", [ClientId, Msg]), - {noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}}; + {noreply, Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}}; error -> - lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: MsgId=~p", [ClientId, MsgId]), + lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: PktId=~p", [ClientId, PktId]), {noreply, Session} end; -handle_info({timeout, awaiting_comp, MsgId}, Session = #session{client_id = ClientId, +handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = ClientId, awaiting_comp = Awaiting}) -> - case maps:find(MsgId, Awaiting) of + case maps:find(PktId, Awaiting) of {ok, _TRef} -> lager:error([{client, ClientId}], "Session ~s " - "Awaiting PUBCOMP Timout: MsgId=~p!", [ClientId, MsgId]), - {noreply, Session#session{awaiting_comp = maps:remove(MsgId, Awaiting)}}; + "Awaiting PUBCOMP Timout: PktId=~p!", [ClientId, PktId]), + {noreply, Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}}; error -> lager:error([{client, ClientId}], "Session ~s " - "Cannot find Awaiting PUBCOMP: MsgId=~p", [ClientId, MsgId]), + "Cannot find Awaiting PUBCOMP: PktId=~p", [ClientId, PktId]), {noreply, Session} end; @@ -566,13 +566,13 @@ dequeue2(Session = #session{message_queue = Q}) -> deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) -> ClientPid ! {deliver, Msg}, Session; -deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{message_id = MsgId, +deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{packet_id = PktId, client_pid = ClientPid, inflight_queue = InflightQ}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> - Msg1 = Msg#mqtt_message{msgid = MsgId, dup = false}, + Msg1 = Msg#mqtt_message{pktid = PktId, dup = false}, ClientPid ! {deliver, Msg1}, - await(Msg1, next_msgid(Session#session{inflight_queue = [{MsgId, Msg1}|InflightQ]})). + await(Msg1, next_packet_id(Session#session{inflight_queue = [{PktId, Msg1}|InflightQ]})). redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) -> deliver(Msg, Session); @@ -585,23 +585,23 @@ redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = Client %%------------------------------------------------------------------------------ %% Awaiting ack for qos1, qos2 message %%------------------------------------------------------------------------------ -await(#mqtt_message{msgid = MsgId}, Session = #session{awaiting_ack = Awaiting, +await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting, unack_retries = Retries, unack_timeout = Timeout}) -> - TRef = timer(Timeout, {timeout, awaiting_ack, MsgId}), - Awaiting1 = maps:put(MsgId, {{Retries, Timeout}, TRef}, Awaiting), + TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), + Awaiting1 = maps:put(PktId, {{Retries, Timeout}, TRef}, Awaiting), Session#session{awaiting_ack = Awaiting1}. -acked(MsgId, Session = #session{inflight_queue = InflightQ, +acked(PktId, Session = #session{inflight_queue = InflightQ, awaiting_ack = Awaiting}) -> - Session#session{inflight_queue = lists:keydelete(MsgId, 1, InflightQ), - awaiting_ack = maps:remove(MsgId, Awaiting)}. + Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), + awaiting_ack = maps:remove(PktId, Awaiting)}. -next_msgid(Session = #session{message_id = 16#ffff}) -> - Session#session{message_id = 1}; +next_packet_id(Session = #session{packet_id = 16#ffff}) -> + Session#session{packet_id = 1}; -next_msgid(Session = #session{message_id = MsgId}) -> - Session#session{message_id = MsgId + 1}. +next_packet_id(Session = #session{packet_id = Id}) -> + Session#session{packet_id = Id + 1}. timer(Timeout, TimeoutMsg) -> erlang:send_after(Timeout * 1000, self(), TimeoutMsg). diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 24ef12a6c..8fa51fcc2 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -175,9 +175,9 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= publish(Stat, Val) -> - emqttd_pubsub:publish(#mqtt_message{from = stats, - topic = stats_topic(Stat), - payload = emqttd_util:integer_to_binary(Val)}). + Msg = emqttd_message:make(stats, stats_topic(Stat), + emqttd_util:integer_to_binary(Val)), + emqttd_pubsub:publish(Msg). stats_topic(Stat) -> emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).