retain message when publish

This commit is contained in:
Ery Lee 2015-04-20 03:26:09 +08:00
parent ab84b6ff09
commit 229bcb6873
8 changed files with 34 additions and 24 deletions

View File

@ -214,13 +214,13 @@ create(Topic) ->
emqttd_pubsub:create(Topic). emqttd_pubsub:create(Topic).
retain(Topic, Payload) when is_binary(Payload) -> retain(Topic, Payload) when is_binary(Payload) ->
emqttd_pubsub:publish(#mqtt_message{retain = true, emqttd_pubsub:publish(broker, #mqtt_message{retain = true,
topic = Topic, topic = Topic,
payload = Payload}). payload = Payload}).
publish(Topic, Payload) when is_binary(Payload) -> publish(Topic, Payload) when is_binary(Payload) ->
emqttd_pubsub:publish(#mqtt_message{topic = Topic, emqttd_pubsub:publish(broker, #mqtt_message{topic = Topic,
payload = Payload}). payload = Payload}).
uptime(#state{started_at = Ts}) -> uptime(#state{started_at = Ts}) ->
Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,

View File

@ -75,13 +75,13 @@ init([]) ->
handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) ->
Topic = <<SysTop/binary, "clients/", ClientId/binary, "/connected">>, Topic = <<SysTop/binary, "clients/", ClientId/binary, "/connected">>,
Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)}, Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)},
emqttd_pubsub:publish(Msg), emqttd_pubsub:publish(event, Msg),
{ok, State}; {ok, State};
handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) -> handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) ->
Topic = <<SysTop/binary, "clients/", ClientId/binary, "/disconnected">>, Topic = <<SysTop/binary, "clients/", ClientId/binary, "/disconnected">>,
Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)},
emqttd_pubsub:publish(Msg), emqttd_pubsub:publish(event, Msg),
{ok, State}; {ok, State};
handle_event({subscribed, ClientId, TopicTable}, State) -> handle_event({subscribed, ClientId, TopicTable}, State) ->

View File

@ -55,10 +55,10 @@ handle('POST', "/mqtt/publish", Req) ->
Message = list_to_binary(get_value("message", Params)), Message = list_to_binary(get_value("message", Params)),
case {validate(qos, Qos), validate(topic, Topic)} of case {validate(qos, Qos), validate(topic, Topic)} of
{true, true} -> {true, true} ->
emqttd_pubsub:publish(#mqtt_message{qos = Qos, emqttd_pubsub:publish(http, #mqtt_message{qos = Qos,
retain = Retain, retain = Retain,
topic = Topic, topic = Topic,
payload = Message}), payload = Message}),
Req:ok({"text/plan", <<"ok\n">>}); Req:ok({"text/plan", <<"ok\n">>});
{false, _} -> {false, _} ->
Req:respond({400, [], <<"Bad QoS">>}); Req:respond({400, [], <<"Bad QoS">>});

View File

@ -220,7 +220,8 @@ systop(Name) when is_atom(Name) ->
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
publish(Topic, Payload) -> publish(Topic, Payload) ->
emqttd_pubsub:publish(#mqtt_message{topic = Topic, payload = Payload}). emqttd_pubsub:publish(metrics, #mqtt_message{topic = Topic,
payload = Payload}).
new_metric({gauge, Name}) -> new_metric({gauge, Name}) ->
ets:insert(?METRIC_TABLE, {{Name, 0}, 0}); ets:insert(?METRIC_TABLE, {{Name, 0}, 0});

View File

@ -84,7 +84,7 @@ retain(Msg = #mqtt_message{topic = Topic,
lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]); lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]);
{_, false}-> {_, false}->
lager:error("Dropped retained message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)]) lager:error("Dropped retained message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)])
end. end, ok.
limit(table) -> limit(table) ->
proplists:get_value(max_message_num, env()); proplists:get_value(max_message_num, env());

View File

@ -266,7 +266,7 @@ send_willmsg(_ClientId, undefined) ->
ignore; ignore;
%%TODO:should call session... %%TODO:should call session...
send_willmsg(ClientId, WillMsg) -> send_willmsg(ClientId, WillMsg) ->
emqttd_pubsub:publish(WillMsg). emqttd_pubsub:publish(ClientId, WillMsg).
start_keepalive(0) -> ignore; start_keepalive(0) -> ignore;
start_keepalive(Sec) when Sec > 0 -> start_keepalive(Sec) when Sec > 0 ->

View File

@ -32,6 +32,8 @@
-include_lib("emqtt/include/emqtt.hrl"). -include_lib("emqtt/include/emqtt.hrl").
-include_lib("emqtt/include/emqtt_packet.hrl").
%% Mnesia Callbacks %% Mnesia Callbacks
-export([mnesia/1]). -export([mnesia/1]).
@ -46,7 +48,7 @@
-export([create/1, -export([create/1,
subscribe/1, subscribe/1,
unsubscribe/1, unsubscribe/1,
publish/1, publish/2, publish/2,
%local node %local node
dispatch/2, match/1]). dispatch/2, match/1]).
@ -138,12 +140,19 @@ cast(Msg) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Publish to cluster nodes. %% @doc Publish to cluster nodes.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec publish(Msg :: mqtt_message()) -> ok. -spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok.
publish(Msg=#mqtt_message{topic=Topic}) -> publish(From, Msg=#mqtt_message{topic=Topic}) ->
publish(Topic, Msg). lager:info("~s PUBLISH to ~s", [From, Topic]),
%% Retain message first. Don't create retained topic.
-spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any(). case emqttd_msg_store:retain(Msg) of
publish(Topic, Msg) when is_binary(Topic) -> ok ->
%TODO: why unset 'retain' flag?
publish(From, Topic, emqtt_message:unset_flag(Msg));
ignore ->
publish(From, Topic, Msg)
end.
publish(_From, Topic, Msg) when is_binary(Topic) ->
lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) -> lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) ->
case Node =:= node() of case Node =:= node() of
true -> dispatch(Name, Msg); true -> dispatch(Name, Msg);

View File

@ -103,10 +103,10 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session(). -spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session().
publish(Session, ClientId, {?QOS_0, Message}) -> publish(Session, ClientId, {?QOS_0, Message}) ->
emqttd_pubsub:publish(Message), Session; emqttd_pubsub:publish(ClientId, Message), Session;
publish(Session, ClientId, {?QOS_1, Message}) -> publish(Session, ClientId, {?QOS_1, Message}) ->
emqttd_pubsub:publish(Message), Session; emqttd_pubsub:publish(ClientId, Message), Session;
publish(SessState = #session_state{awaiting_rel = AwaitingRel}, _ClientId, publish(SessState = #session_state{awaiting_rel = AwaitingRel}, _ClientId,
{?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) ->
@ -151,7 +151,7 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
puback(SessState = #session_state{clientid = ClientId, puback(SessState = #session_state{clientid = ClientId,
awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
case maps:find(PacketId, Awaiting) of case maps:find(PacketId, Awaiting) of
{ok, Msg} -> emqttd_pubsub:publish(Msg); {ok, Msg} -> emqttd_pubsub:publish(ClientId, Msg);
error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId]) error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId])
end, end,
SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)}; SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)};