diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 764a1ad31..faff38698 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -214,13 +214,13 @@ create(Topic) -> emqttd_pubsub:create(Topic). retain(Topic, Payload) when is_binary(Payload) -> - emqttd_pubsub:publish(#mqtt_message{retain = true, - topic = Topic, - payload = Payload}). + emqttd_pubsub:publish(broker, #mqtt_message{retain = true, + topic = Topic, + payload = Payload}). publish(Topic, Payload) when is_binary(Payload) -> - emqttd_pubsub:publish(#mqtt_message{topic = Topic, - payload = Payload}). + emqttd_pubsub:publish(broker, #mqtt_message{topic = Topic, + payload = Payload}). uptime(#state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, diff --git a/apps/emqttd/src/emqttd_event.erl b/apps/emqttd/src/emqttd_event.erl index 8a6af1db8..a4c6a9252 100644 --- a/apps/emqttd/src/emqttd_event.erl +++ b/apps/emqttd/src/emqttd_event.erl @@ -75,13 +75,13 @@ init([]) -> handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)}, - emqttd_pubsub:publish(Msg), + emqttd_pubsub:publish(event, Msg), {ok, State}; handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, - emqttd_pubsub:publish(Msg), + emqttd_pubsub:publish(event, Msg), {ok, State}; handle_event({subscribed, ClientId, TopicTable}, State) -> diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index 4a24f4893..c4f95f21a 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -55,10 +55,10 @@ handle('POST', "/mqtt/publish", Req) -> Message = list_to_binary(get_value("message", Params)), case {validate(qos, Qos), validate(topic, Topic)} of {true, true} -> - emqttd_pubsub:publish(#mqtt_message{qos = Qos, - retain = Retain, - topic = Topic, - payload = Message}), + emqttd_pubsub:publish(http, #mqtt_message{qos = Qos, + retain = Retain, + topic = Topic, + payload = Message}), Req:ok({"text/plan", <<"ok\n">>}); {false, _} -> Req:respond({400, [], <<"Bad QoS">>}); diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index ce03af907..0941b16dd 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -220,7 +220,8 @@ systop(Name) when is_atom(Name) -> list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). publish(Topic, Payload) -> - emqttd_pubsub:publish(#mqtt_message{topic = Topic, payload = Payload}). + emqttd_pubsub:publish(metrics, #mqtt_message{topic = Topic, + payload = Payload}). new_metric({gauge, Name}) -> ets:insert(?METRIC_TABLE, {{Name, 0}, 0}); diff --git a/apps/emqttd/src/emqttd_msg_store.erl b/apps/emqttd/src/emqttd_msg_store.erl index 4b9aa191b..4e03c0e03 100644 --- a/apps/emqttd/src/emqttd_msg_store.erl +++ b/apps/emqttd/src/emqttd_msg_store.erl @@ -84,7 +84,7 @@ retain(Msg = #mqtt_message{topic = Topic, lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]); {_, false}-> lager:error("Dropped retained message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)]) - end. + end, ok. limit(table) -> proplists:get_value(max_message_num, env()); diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 08de737f5..592de67e6 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -266,7 +266,7 @@ send_willmsg(_ClientId, undefined) -> ignore; %%TODO:should call session... send_willmsg(ClientId, WillMsg) -> - emqttd_pubsub:publish(WillMsg). + emqttd_pubsub:publish(ClientId, WillMsg). start_keepalive(0) -> ignore; start_keepalive(Sec) when Sec > 0 -> diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 4792d8f61..5e726b54c 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -32,6 +32,8 @@ -include_lib("emqtt/include/emqtt.hrl"). +-include_lib("emqtt/include/emqtt_packet.hrl"). + %% Mnesia Callbacks -export([mnesia/1]). @@ -46,7 +48,7 @@ -export([create/1, subscribe/1, unsubscribe/1, - publish/1, publish/2, + publish/2, %local node dispatch/2, match/1]). @@ -138,12 +140,19 @@ cast(Msg) -> %%------------------------------------------------------------------------------ %% @doc Publish to cluster nodes. %%------------------------------------------------------------------------------ --spec publish(Msg :: mqtt_message()) -> ok. -publish(Msg=#mqtt_message{topic=Topic}) -> - publish(Topic, Msg). - --spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any(). -publish(Topic, Msg) when is_binary(Topic) -> +-spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok. +publish(From, Msg=#mqtt_message{topic=Topic}) -> + lager:info("~s PUBLISH to ~s", [From, Topic]), + %% Retain message first. Don't create retained topic. + case emqttd_msg_store:retain(Msg) of + ok -> + %TODO: why unset 'retain' flag? + publish(From, Topic, emqtt_message:unset_flag(Msg)); + ignore -> + publish(From, Topic, Msg) + end. + +publish(_From, Topic, Msg) when is_binary(Topic) -> lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) -> case Node =:= node() of true -> dispatch(Name, Msg); diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 2bca04699..a2a9a3ecd 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -103,10 +103,10 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> %%------------------------------------------------------------------------------ -spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session(). publish(Session, ClientId, {?QOS_0, Message}) -> - emqttd_pubsub:publish(Message), Session; + emqttd_pubsub:publish(ClientId, Message), Session; publish(Session, ClientId, {?QOS_1, Message}) -> - emqttd_pubsub:publish(Message), Session; + emqttd_pubsub:publish(ClientId, Message), Session; publish(SessState = #session_state{awaiting_rel = AwaitingRel}, _ClientId, {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> @@ -151,7 +151,7 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> puback(SessState = #session_state{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> case maps:find(PacketId, Awaiting) of - {ok, Msg} -> emqttd_pubsub:publish(Msg); + {ok, Msg} -> emqttd_pubsub:publish(ClientId, Msg); error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId]) end, SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)};