emqttd_message:make, hooks

This commit is contained in:
Feng 2015-07-05 20:18:37 +08:00
parent 669a717bb2
commit 895c9ddaed
14 changed files with 203 additions and 191 deletions

View File

@ -121,12 +121,10 @@ terminate(_, _) ->
ok.
alarm_msg(Type, AlarmId, Json) ->
#mqtt_message{from = alarm,
qos = 1,
sys = true,
topic = topic(Type, AlarmId),
payload = iolist_to_binary(Json),
timestamp = os:timestamp()}.
Msg = emqttd_message:make(alarm,
topic(Type, AlarmId),
iolist_to_binary(Json)),
emqttd_message:set_flag(sys, Msg).
topic(alert, AlarmId) ->
emqttd_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);

View File

@ -81,7 +81,7 @@ init([Node, SubTopic, Options]) ->
true ->
true = erlang:monitor_node(Node, true),
State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}),
emqttd_pubsub:subscribe({SubTopic, ?QOS_0}),
emqttd_pubsub:subscribe({SubTopic, State#state.qos}),
{ok, State};
false ->
{stop, {cannot_connect, Node}}
@ -107,7 +107,7 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({dispatch, Msg}, State = #state{node = Node, status = down}) ->
lager:warning("Bridge Dropped Msg for ~p Down:~n~p", [Node, Msg]),
lager:error("Bridge Dropped Msg for ~p Down: ~s", [Node, emqttd_message:format(Msg)]),
{noreply, State};
handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) ->
@ -159,14 +159,7 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%=============================================================================
%TODO: qos is not right...
transform(Msg = #mqtt_message{topic = Topic}, #state{qos = Qos,
topic_prefix = Prefix,
transform(Msg = #mqtt_message{topic = Topic}, #state{topic_prefix = Prefix,
topic_suffix = Suffix}) ->
Msg1 =
if
Qos =:= undefined -> Msg;
true -> Msg#mqtt_message{qos = Qos}
end,
Msg1#mqtt_message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.
Msg#mqtt_message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.

View File

@ -186,7 +186,7 @@ foldl_hooks(Hook, Args, Acc0) ->
case ets:lookup(?BROKER_TAB, {hook, Hook}) of
[{_, Hooks}] ->
lists:foldl(fun({_Name, {M, F, A}}, Acc) ->
apply(M, F, [Acc, Args++A])
apply(M, F, lists:append([Args, [Acc], A]))
end, Acc0, Hooks);
[] ->
Acc0
@ -286,23 +286,15 @@ create_topic(Topic) ->
retain(brokers) ->
Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")),
publish(#mqtt_message{from = broker,
retain = true,
topic = <<"$SYS/brokers">>,
payload = Payload}).
Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload),
emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).
retain(Topic, Payload) when is_binary(Payload) ->
publish(#mqtt_message{from = broker,
retain = true,
topic = emqttd_topic:systop(Topic),
payload = Payload}).
Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload),
emqttd_pubsub:publish(emqttd_message:set_flag(retain, Msg)).
publish(Topic, Payload) when is_binary(Payload) ->
publish( #mqtt_message{from = broker,
topic = emqttd_topic:systop(Topic),
payload = Payload}).
publish(Msg) ->
Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload),
emqttd_pubsub:publish(Msg).
uptime(#state{started_at = Ts}) ->

View File

@ -49,14 +49,11 @@ handle_request('POST', "/mqtt/publish", Req) ->
Qos = int(get_value("qos", Params, "0")),
Retain = bool(get_value("retain", Params, "0")),
Topic = list_to_binary(get_value("topic", Params)),
Message = list_to_binary(get_value("message", Params)),
Payload = list_to_binary(get_value("message", Params)),
case {validate(qos, Qos), validate(topic, Topic)} of
{true, true} ->
emqttd_pubsub:publish(#mqtt_message{from = http,
qos = Qos,
retain = Retain,
topic = Topic,
payload = Message}),
Msg = emqttd_message:make(http, Qos, Topic, Payload),
emqttd_pubsub:publish(Msg#mqtt_message{retain = Retain}),
Req:ok({"text/plan", <<"ok\n">>});
{false, _} ->
Req:respond({400, [], <<"Bad QoS">>});

View File

@ -32,12 +32,39 @@
-include("emqttd_protocol.hrl").
-export([from_packet/1, from_packet/2, to_packet/1]).
-export([make/3, make/4, from_packet/1, from_packet/2, to_packet/1]).
-export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]).
-export([format/1]).
%%------------------------------------------------------------------------------
%% @doc Make a message
%% @end
%%------------------------------------------------------------------------------
-spec make(From, Topic, Payload) -> mqtt_message() when
From :: atom() | binary(),
Topic :: binary(),
Payload :: binary().
make(From, Topic, Payload) ->
#mqtt_message{topic = Topic,
from = From,
payload = Payload,
timestamp = os:timestamp()}.
-spec make(From, Qos, Topic, Payload) -> mqtt_message() when
From :: atom() | binary(),
Qos :: mqtt_qos(),
Topic :: binary(),
Payload :: binary().
make(From, Qos, Topic, Payload) ->
#mqtt_message{msgid = msgid(Qos),
topic = Topic,
from = From,
qos = Qos,
payload = Payload,
timestamp = os:timestamp()}.
%%------------------------------------------------------------------------------
%% @doc Message from Packet
%% @end
@ -50,12 +77,14 @@ from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId},
payload = Payload}) ->
#mqtt_message{msgid = PacketId,
#mqtt_message{msgid = msgid(Qos),
pktid = PacketId,
qos = Qos,
retain = Retain,
dup = Dup,
topic = Topic,
payload = Payload};
payload = Payload,
timestamp = os:timestamp()};
from_packet(#mqtt_packet_connect{will_flag = false}) ->
undefined;
@ -64,38 +93,44 @@ from_packet(#mqtt_packet_connect{will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg}) ->
#mqtt_message{retain = Retain,
qos = Qos,
#mqtt_message{msgid = msgid(Qos),
topic = Topic,
retain = Retain,
qos = Qos,
dup = false,
payload = Msg}.
payload = Msg,
timestamp = os:timestamp()}.
from_packet(ClientId, Packet) ->
Msg = from_packet(Packet), Msg#mqtt_message{from = ClientId}.
msgid(?QOS_0) ->
undefined;
msgid(_Qos) ->
emqttd_guid:gen().
%%------------------------------------------------------------------------------
%% @doc Message to packet
%% @end
%%------------------------------------------------------------------------------
-spec to_packet(mqtt_message()) -> mqtt_packet().
to_packet(#mqtt_message{msgid = MsgId,
to_packet(#mqtt_message{pktid = PkgId,
qos = Qos,
retain = Retain,
dup = Dup,
topic = Topic,
payload = Payload}) ->
PacketId = if
Qos =:= ?QOS_0 -> undefined;
true -> MsgId
end,
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = Qos,
retain = Retain,
dup = Dup},
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId},
packet_id = if
Qos =:= ?QOS_0 -> undefined;
true -> PkgId
end
},
payload = Payload}.
%%------------------------------------------------------------------------------
@ -109,6 +144,8 @@ set_flag(Msg) ->
-spec set_flag(atom(), mqtt_message()) -> mqtt_message().
set_flag(dup, Msg = #mqtt_message{dup = false}) ->
Msg#mqtt_message{dup = true};
set_flag(sys, Msg = #mqtt_message{sys = false}) ->
Msg#mqtt_message{sys = true};
set_flag(retain, Msg = #mqtt_message{retain = false}) ->
Msg#mqtt_message{retain = true};
set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
@ -133,7 +170,7 @@ unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
%% @doc Format MQTT Message
%% @end
%%------------------------------------------------------------------------------
format(#mqtt_message{msgid=MsgId, qos=Qos, retain=Retain, dup=Dup, topic=Topic}) ->
io_lib:format("Message(MsgId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)",
[MsgId, Qos, Retain, Dup, Topic]).
format(#mqtt_message{msgid=MsgId, pktid = PktId, qos=Qos, retain=Retain, dup=Dup, topic=Topic}) ->
io_lib:format("Message(MsgId=~p, PktId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)",
[MsgId, PktId, Qos, Retain, Dup, Topic]).

View File

@ -77,7 +77,7 @@
-define(SYSTOP_MESSAGES, [
{counter, 'messages/received'}, % Messages received
{counter, 'messages/sent'}, % Messages sent
{gauge, 'messages/retained/count'},% Messagea retained
{gauge, 'messages/retained'}, % Messagea retained
{gauge, 'messages/stored/count'}, % Messages stored
{counter, 'messages/dropped'} % Messages dropped
]).
@ -222,9 +222,9 @@ code_change(_OldVsn, State, _Extra) ->
%%%=============================================================================
publish(Metric, Val) ->
emqttd_pubsub:publish(#mqtt_message{topic = metric_topic(Metric),
from = metrics,
payload = emqttd_util:integer_to_binary(Val)}).
Payload = emqttd_util:integer_to_binary(Val),
Msg = emqttd_message:make(metrics, metric_topic(Metric), Payload),
emqttd_pubsub:publish(Msg).
create_metric({gauge, Name}) ->
ets:insert(?METRIC_TAB, {{Name, 0}, 0});

View File

@ -41,7 +41,7 @@
load(Opts) ->
Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2],
emqttd_broker:hook(client_connected, {?MODULE, client_connected},
emqttd_broker:hook('client.connected', {?MODULE, client_connected},
{?MODULE, client_connected, [Topics]}),
{ok, #state{topics = Topics}}.
@ -53,6 +53,5 @@ client_connected(_ConnAck, _Client, _Topics) ->
ignore.
unload(_Opts) ->
emqttd_broker:unhook(client_connected, {?MODULE, client_connected}).
emqttd_broker:unhook('client.connected', {?MODULE, client_connected}).

View File

@ -35,8 +35,8 @@
-export([client_connected/3, client_disconnected/3]).
load(Opts) ->
emqttd_broker:hook(client_connected, {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}),
emqttd_broker:hook(client_disconnected, {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}),
emqttd_broker:hook('client.connected', {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}),
emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}),
{ok, Opts}.
client_connected(ConnAck, #mqtt_client{client_id = ClientId,
@ -55,24 +55,25 @@ client_connected(ConnAck, #mqtt_client{client_id = ClientId,
{protocol, ProtoVer},
{connack, ConnAck},
{ts, emqttd_util:now_to_secs()}]),
Message = #mqtt_message{from = presence,
qos = proplists:get_value(qos, Opts, 0),
topic = topic(connected, ClientId),
payload = iolist_to_binary(Json)},
emqttd_pubsub:publish(Message).
Msg = emqttd_message:make(presence,
proplists:get_value(qos, Opts, 0),
topic(connected, ClientId),
iolist_to_binary(Json)),
emqttd_pubsub:publish(Msg).
client_disconnected(Reason, ClientId, Opts) ->
Json = mochijson2:encode([{clientid, ClientId},
{reason, reason(Reason)},
{ts, emqttd_util:now_to_secs()}]),
emqttd_pubsub:publish(#mqtt_message{from = presence,
qos = proplists:get_value(qos, Opts, 0),
topic = topic(disconnected, ClientId),
payload = iolist_to_binary(Json)}).
Msg = emqttd_message:make(presence,
proplists:get_value(qos, Opts, 0),
topic(disconnected, ClientId),
iolist_to_binary(Json)),
emqttd_pubsub:publish(Msg).
unload(_Opts) ->
emqttd_broker:unhook(client_connected, {?MODULE, client_connected}),
emqttd_broker:unhook(client_disconnected, {?MODULE, client_disconnected}).
emqttd_broker:unhook('client.connected', {?MODULE, client_connected}),
emqttd_broker:unhook('client.disconnected', {?MODULE, client_disconnected}).
topic(connected, ClientId) ->
emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/connected"]));

View File

@ -35,7 +35,7 @@
-export([load/1, reload/1, unload/1]).
-export([rewrite/2]).
-export([rewrite/3, rewrite/4]).
%%%=============================================================================
%%% API
@ -45,22 +45,22 @@ load(Opts) ->
File = proplists:get_value(file, Opts),
{ok, Terms} = file:consult(File),
Sections = compile(Terms),
emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe},
emqttd_broker:hook('client.subscribe', {?MODULE, rewrite_subscribe},
{?MODULE, rewrite, [subscribe, Sections]}),
emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe},
emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe},
{?MODULE, rewrite, [unsubscribe, Sections]}),
emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish},
emqttd_broker:hook('client.publish', {?MODULE, rewrite_publish},
{?MODULE, rewrite, [publish, Sections]}).
rewrite(TopicTable, [subscribe, Sections]) ->
rewrite(_ClientId, TopicTable, subscribe, Sections) ->
lager:info("rewrite subscribe: ~p", [TopicTable]),
[{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable];
rewrite(Topics, [unsubscribe, Sections]) ->
rewrite(_ClientId, Topics, unsubscribe, Sections) ->
lager:info("rewrite unsubscribe: ~p", [Topics]),
[match_topic(Topic, Sections) || Topic <- Topics];
[match_topic(Topic, Sections) || Topic <- Topics].
rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) ->
rewrite(Message=#mqtt_message{topic = Topic}, publish, Sections) ->
%%TODO: this will not work if the client is always online.
RewriteTopic =
case get({rewrite, Topic}) of
@ -83,9 +83,9 @@ reload(File) ->
end.
unload(_) ->
emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}),
emqttd_broker:unhook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}),
emqttd_broker:unhook(client_publish, {?MODULE, rewrite_publish}).
emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}),
emqttd_broker:unhook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
emqttd_broker:unhook('client.publish', {?MODULE, rewrite_publish}).
%%%=============================================================================
%%% Internal functions
@ -116,7 +116,6 @@ match_rule(Topic, []) ->
match_rule(Topic, [{rewrite, MP, Dest} | Rules]) ->
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
{match, Captured} ->
%%TODO: stupid??? how to replace $1, $2?
Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured),
iolist_to_binary(lists:foldl(
fun({Var, Val}, Acc) ->

View File

@ -207,7 +207,7 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id =
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
{ok, State};
false ->
TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable),
TopicTable1 = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable),
%%TODO: GrantedQos should be renamed.
{ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1),
send(?SUBACK_PACKET(PacketId, GrantedQos), State)
@ -221,8 +221,9 @@ handle({subscribe, TopicTable}, State = #proto_state{session = Session}) ->
handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
send(?UNSUBACK_PACKET(PacketId), State);
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics),
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{client_id = ClientId,
session = Session}) ->
Topics1 = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics),
ok = emqttd_session:unsubscribe(Session, Topics1),
send(?UNSUBACK_PACKET(PacketId), State);

View File

@ -157,19 +157,18 @@ cast(Msg) ->
%% @end
%%------------------------------------------------------------------------------
-spec publish(Msg :: mqtt_message()) -> ok.
publish(#mqtt_message{topic=Topic, from = From} = Msg) ->
publish(#mqtt_message{from = From} = Msg) ->
trace(publish, From, Msg),
%%TODO:call hooks here...
%%Msg1 = emqttd_broker:foldl_hooks(client_publish, [], Msg),
Msg1 = #mqtt_message{topic = Topic} = emqttd_broker:foldl_hooks('client.publish', [], Msg),
%% Retain message first. Don't create retained topic.
case emqttd_msg_store:retain(Msg) of
case emqttd_retained:retain(Msg1) of
ok ->
%TODO: why unset 'retain' flag?
publish(Topic, emqttd_message:unset_flag(Msg));
publish(Topic, emqttd_message:unset_flag(Msg1));
ignore ->
publish(Topic, Msg)
publish(Topic, Msg1)
end.
publish(<<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->

View File

@ -24,7 +24,7 @@
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_msg_store).
-module(emqttd_retained).
-author("Feng Lee <feng@emqtt.io>").
@ -37,21 +37,23 @@
-copy_mnesia({mnesia, [copy]}).
%% API Function Exports
-export([retain/1, redeliver/2]).
-export([retain/1, dispatch/2]).
-record(mqtt_retained, {topic, message}).
%%%=============================================================================
%%% Mnesia callbacks
%%%=============================================================================
mnesia(boot) ->
ok = emqttd_mnesia:create_table(message, [
ok = emqttd_mnesia:create_table(retained, [
{type, ordered_set},
{ram_copies, [node()]},
{record_name, mqtt_message},
{attributes, record_info(fields, mqtt_message)}]);
{record_name, mqtt_retained},
{attributes, record_info(fields, mqtt_retained)}]);
mnesia(copy) ->
ok = emqttd_mnesia:copy_table(message).
ok = emqttd_mnesia:copy_table(retained).
%%%=============================================================================
%%% API
@ -66,7 +68,7 @@ retain(#mqtt_message{retain = false}) -> ignore;
%% RETAIN flag set to 1 and payload containing zero bytes
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
mnesia:async_dirty(fun mnesia:delete/1, [{message, Topic}]);
mnesia:async_dirty(fun mnesia:delete/1, [{retained, Topic}]);
retain(Msg = #mqtt_message{topic = Topic,
retain = true,
@ -74,10 +76,10 @@ retain(Msg = #mqtt_message{topic = Topic,
TabSize = mnesia:table_info(message, size),
case {TabSize < limit(table), size(Payload) < limit(payload)} of
{true, true} ->
Retained = #mqtt_retained{topic = Topic, message = Msg},
lager:debug("Retained ~s", [emqttd_message:format(Msg)]),
mnesia:async_dirty(fun mnesia:write/3, [message, Msg, write]),
emqttd_metrics:set('messages/retained/count',
mnesia:table_info(message, size));
mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]),
emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size));
{false, _}->
lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]);
{_, false}->
@ -99,31 +101,25 @@ env() ->
end.
%%%-----------------------------------------------------------------------------
%% @doc Redeliver retained messages to subscribed client
%% @doc Deliver retained messages to subscribed client
%% @end
%%%-----------------------------------------------------------------------------
-spec redeliver(Topic, CPid) -> any() when
-spec dispatch(Topic, CPid) -> any() when
Topic :: binary(),
CPid :: pid().
redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) ->
dispatch(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) ->
Msgs =
case emqttd_topic:wildcard(Topic) of
false ->
dispatch(CPid, mnesia:dirty_read(message, Topic));
[Msg || #mqtt_retained{message = Msg} <- mnesia:dirty_read(retained, Topic)];
true ->
Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) ->
Fun = fun(#mqtt_retained{topic = Name, message = Msg}, Acc) ->
case emqttd_topic:match(Name, Topic) of
true -> [Msg|Acc];
false -> Acc
end
end,
Msgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], message]),
dispatch(CPid, lists:reverse(Msgs))
end.
dispatch(_CPid, []) ->
ignore;
dispatch(CPid, Msgs) when is_list(Msgs) ->
[CPid ! {dispatch, Msg} || Msg <- Msgs];
dispatch(CPid, Msg) when is_record(Msg, mqtt_message) ->
CPid ! {dispatch, Msg}.
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained])
end,
[CPid ! {dispatch, Msg} || Msg <- Msgs].

View File

@ -78,8 +78,8 @@
%% Client Pid linked with session
client_pid :: pid(),
%% Last message id of the session
message_id = 1,
%% Last packet id of the session
packet_id = 1,
%% Clients subscriptions.
subscriptions :: list(),
@ -182,21 +182,21 @@ publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) ->
%% @doc PubAck message
%% @end
%%------------------------------------------------------------------------------
-spec puback(pid(), mqtt_msgid()) -> ok.
puback(Session, MsgId) ->
gen_server:cast(Session, {puback, MsgId}).
-spec puback(pid(), mqtt_packet_id()) -> ok.
puback(Session, PktId) ->
gen_server:cast(Session, {puback, PktId}).
-spec pubrec(pid(), mqtt_msgid()) -> ok.
pubrec(Session, MsgId) ->
gen_server:cast(Session, {pubrec, MsgId}).
-spec pubrec(pid(), mqtt_packet_id()) -> ok.
pubrec(Session, PktId) ->
gen_server:cast(Session, {pubrec, PktId}).
-spec pubrel(pid(), mqtt_msgid()) -> ok.
pubrel(Session, MsgId) ->
gen_server:cast(Session, {pubrel, MsgId}).
-spec pubrel(pid(), mqtt_packet_id()) -> ok.
pubrel(Session, PktId) ->
gen_server:cast(Session, {pubrel, PktId}).
-spec pubcomp(pid(), mqtt_msgid()) -> ok.
pubcomp(Session, MsgId) ->
gen_server:cast(Session, {pubcomp, MsgId}).
-spec pubcomp(pid(), mqtt_packet_id()) -> ok.
pubcomp(Session, PktId) ->
gen_server:cast(Session, {pubcomp, PktId}).
%%------------------------------------------------------------------------------
%% @doc Unsubscribe Topics
@ -258,7 +258,7 @@ handle_call({subscribe, Topics}, _From, Session = #session{client_id = ClientId,
%% <MQTT V3.1.1>: 3.8.4
%% Where the Topic Filter is not identical to any existing Subscriptions filter,
%% a new Subscription is created and all matching retained messages are sent.
emqttd_msg_store:redeliver(Topic, self()),
emqttd_retained:dispatch(Topic, self()),
[{Topic, Qos} | Acc]
end
end, Subscriptions, Topics),
@ -284,14 +284,14 @@ handle_call({unsubscribe, Topics}, _From, Session = #session{client_id = ClientI
{reply, ok, Session#session{subscriptions = Subscriptions1}};
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From,
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From,
Session = #session{client_id = ClientId,
awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) ->
case check_awaiting_rel(Session) of
true ->
TRef = timer(Timeout, {timeout, awaiting_rel, MsgId}),
AwaitingRel1 = maps:put(MsgId, {Msg, TRef}, AwaitingRel),
TRef = timer(Timeout, {timeout, awaiting_rel, PktId}),
AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel),
{reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
false ->
lager:critical([{client, ClientId}], "Session ~s dropped Qos2 message "
@ -326,7 +326,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
true = link(ClientPid),
%% Redeliver PUBREL
[ClientPid ! {redeliver, {?PUBREL, MsgId}} || MsgId <- maps:keys(AwaitingComp)],
[ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)],
%% Clear awaiting_ack timers
[cancel_timer(TRef) || {_, TRef} <- maps:values(AwaitingAck)],
@ -349,54 +349,54 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
{noreply, dequeue(Session2), hibernate};
%% PUBRAC
handle_cast({puback, MsgId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) ->
case maps:find(MsgId, Awaiting) of
handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) ->
case maps:find(PktId, Awaiting) of
{ok, {_, TRef}} ->
cancel_timer(TRef),
Session1 = acked(MsgId, Session),
Session1 = acked(PktId, Session),
{noreply, dequeue(Session1)};
error ->
lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, MsgId]),
lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]),
{noreply, Session}
end;
%% PUBREC
handle_cast({pubrec, MsgId}, Session = #session{client_id = ClientId,
handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId,
awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp,
await_rel_timeout = Timeout}) ->
case maps:find(MsgId, AwaitingAck) of
case maps:find(PktId, AwaitingAck) of
{ok, {_, TRef}} ->
cancel_timer(TRef),
TRef1 = timer(Timeout, {timeout, awaiting_comp, MsgId}),
Session1 = acked(MsgId, Session#session{awaiting_comp = maps:put(MsgId, TRef1, AwaitingComp)}),
TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}),
Session1 = acked(PktId, Session#session{awaiting_comp = maps:put(PktId, TRef1, AwaitingComp)}),
{noreply, dequeue(Session1)};
error ->
lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, MsgId]),
lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]),
{noreply, Session}
end;
%% PUBREL
handle_cast({pubrel, MsgId}, Session = #session{client_id = ClientId,
handle_cast({pubrel, PktId}, Session = #session{client_id = ClientId,
awaiting_rel = AwaitingRel}) ->
case maps:find(MsgId, AwaitingRel) of
case maps:find(PktId, AwaitingRel) of
{ok, {Msg, TRef}} ->
cancel_timer(TRef),
emqttd_pubsub:publish(Msg),
{noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}};
{noreply, Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}};
error ->
lager:error("Session ~s cannot find PUBREL: msgid=~p!", [ClientId, MsgId]),
lager:error("Session ~s cannot find PUBREL: pktid=~p!", [ClientId, PktId]),
{noreply, Session}
end;
%% PUBCOMP
handle_cast({pubcomp, MsgId}, Session = #session{client_id = ClientId, awaiting_comp = AwaitingComp}) ->
case maps:find(MsgId, AwaitingComp) of
handle_cast({pubcomp, PktId}, Session = #session{client_id = ClientId, awaiting_comp = AwaitingComp}) ->
case maps:find(PktId, AwaitingComp) of
{ok, TRef} ->
cancel_timer(TRef),
{noreply, Session#session{awaiting_comp = maps:remove(MsgId, AwaitingComp)}};
{noreply, Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}};
error ->
lager:error("Session ~s cannot find PUBCOMP: MsgId=~p", [ClientId, MsgId]),
lager:error("Session ~s cannot find PUBCOMP: PktId=~p", [ClientId, PktId]),
{noreply, Session}
end;
@ -428,51 +428,51 @@ handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
{noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}}
end;
handle_info({timeout, awaiting_ack, MsgId}, Session = #session{client_pid = undefined,
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
awaiting_ack = AwaitingAck}) ->
%% just remove awaiting
{noreply, Session#session{awaiting_ack = maps:remove(MsgId, AwaitingAck)}};
{noreply, Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}};
handle_info({timeout, awaiting_ack, MsgId}, Session = #session{client_id = ClientId,
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId,
inflight_queue = InflightQ,
awaiting_ack = AwaitingAck}) ->
case maps:find(MsgId, AwaitingAck) of
case maps:find(PktId, AwaitingAck) of
{ok, {{0, _Timeout}, _TRef}} ->
Session1 = Session#session{inflight_queue = lists:keydelete(MsgId, 1, InflightQ),
awaiting_ack = maps:remove(MsgId, AwaitingAck)},
Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ),
awaiting_ack = maps:remove(PktId, AwaitingAck)},
{noreply, dequeue(Session1)};
{ok, {{Retries, Timeout}, _TRef}} ->
TRef = timer(Timeout, {timeout, awaiting_ack, MsgId}),
AwaitingAck1 = maps:put(MsgId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck),
TRef = timer(Timeout, {timeout, awaiting_ack, PktId}),
AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck),
{noreply, Session#session{awaiting_ack = AwaitingAck1}};
error ->
lager:error([{client, ClientId}], "Session ~s "
"cannot find Awaiting Ack:~p", [ClientId, MsgId]),
"cannot find Awaiting Ack:~p", [ClientId, PktId]),
{noreply, Session}
end;
handle_info({timeout, awaiting_rel, MsgId}, Session = #session{client_id = ClientId,
handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId,
awaiting_rel = AwaitingRel}) ->
case maps:find(MsgId, AwaitingRel) of
case maps:find(PktId, AwaitingRel) of
{ok, {Msg, _TRef}} ->
lager:error([{client, ClientId}], "Session ~s AwaitingRel Timout!~n"
"Drop Message:~p", [ClientId, Msg]),
{noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}};
{noreply, Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}};
error ->
lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: MsgId=~p", [ClientId, MsgId]),
lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: PktId=~p", [ClientId, PktId]),
{noreply, Session}
end;
handle_info({timeout, awaiting_comp, MsgId}, Session = #session{client_id = ClientId,
handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = ClientId,
awaiting_comp = Awaiting}) ->
case maps:find(MsgId, Awaiting) of
case maps:find(PktId, Awaiting) of
{ok, _TRef} ->
lager:error([{client, ClientId}], "Session ~s "
"Awaiting PUBCOMP Timout: MsgId=~p!", [ClientId, MsgId]),
{noreply, Session#session{awaiting_comp = maps:remove(MsgId, Awaiting)}};
"Awaiting PUBCOMP Timout: PktId=~p!", [ClientId, PktId]),
{noreply, Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}};
error ->
lager:error([{client, ClientId}], "Session ~s "
"Cannot find Awaiting PUBCOMP: MsgId=~p", [ClientId, MsgId]),
"Cannot find Awaiting PUBCOMP: PktId=~p", [ClientId, PktId]),
{noreply, Session}
end;
@ -566,13 +566,13 @@ dequeue2(Session = #session{message_queue = Q}) ->
deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
ClientPid ! {deliver, Msg}, Session;
deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{message_id = MsgId,
deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{packet_id = PktId,
client_pid = ClientPid,
inflight_queue = InflightQ})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
Msg1 = Msg#mqtt_message{msgid = MsgId, dup = false},
Msg1 = Msg#mqtt_message{pktid = PktId, dup = false},
ClientPid ! {deliver, Msg1},
await(Msg1, next_msgid(Session#session{inflight_queue = [{MsgId, Msg1}|InflightQ]})).
await(Msg1, next_packet_id(Session#session{inflight_queue = [{PktId, Msg1}|InflightQ]})).
redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) ->
deliver(Msg, Session);
@ -585,23 +585,23 @@ redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = Client
%%------------------------------------------------------------------------------
%% Awaiting ack for qos1, qos2 message
%%------------------------------------------------------------------------------
await(#mqtt_message{msgid = MsgId}, Session = #session{awaiting_ack = Awaiting,
await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting,
unack_retries = Retries,
unack_timeout = Timeout}) ->
TRef = timer(Timeout, {timeout, awaiting_ack, MsgId}),
Awaiting1 = maps:put(MsgId, {{Retries, Timeout}, TRef}, Awaiting),
TRef = timer(Timeout, {timeout, awaiting_ack, PktId}),
Awaiting1 = maps:put(PktId, {{Retries, Timeout}, TRef}, Awaiting),
Session#session{awaiting_ack = Awaiting1}.
acked(MsgId, Session = #session{inflight_queue = InflightQ,
acked(PktId, Session = #session{inflight_queue = InflightQ,
awaiting_ack = Awaiting}) ->
Session#session{inflight_queue = lists:keydelete(MsgId, 1, InflightQ),
awaiting_ack = maps:remove(MsgId, Awaiting)}.
Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ),
awaiting_ack = maps:remove(PktId, Awaiting)}.
next_msgid(Session = #session{message_id = 16#ffff}) ->
Session#session{message_id = 1};
next_packet_id(Session = #session{packet_id = 16#ffff}) ->
Session#session{packet_id = 1};
next_msgid(Session = #session{message_id = MsgId}) ->
Session#session{message_id = MsgId + 1}.
next_packet_id(Session = #session{packet_id = Id}) ->
Session#session{packet_id = Id + 1}.
timer(Timeout, TimeoutMsg) ->
erlang:send_after(Timeout * 1000, self(), TimeoutMsg).

View File

@ -175,9 +175,9 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%=============================================================================
publish(Stat, Val) ->
emqttd_pubsub:publish(#mqtt_message{from = stats,
topic = stats_topic(Stat),
payload = emqttd_util:integer_to_binary(Val)}).
Msg = emqttd_message:make(stats, stats_topic(Stat),
emqttd_util:integer_to_binary(Val)),
emqttd_pubsub:publish(Msg).
stats_topic(Stat) ->
emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).