diff --git a/Makefile b/Makefile index abd416178..a1c202a81 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker -PROJECT_VERSION = 2.2 +PROJECT_VERSION = 2.1.2 DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index a1dd34bcc..815ee38d9 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -37,7 +37,7 @@ -record(state, {pool, id, node, subtopic, - qos = ?QOS_2, + qos = ?QOS_0, topic_suffix = <<>>, topic_prefix = <<>>, mqueue :: emqttd_mqueue:mqueue(), @@ -74,7 +74,7 @@ init([Pool, Id, Node, Topic, Options]) -> true -> true = erlang:monitor_node(Node, true), Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), - emqttd:subscribe(Topic, self(), [local, {share, Share}]), + emqttd:subscribe(Topic, self(), [local, {share, Share}, {qos, ?QOS_0}]), State = parse_opts(Options, #state{node = Node, subtopic = Topic}), MQueue = emqttd_mqueue:new(qname(Node, Topic), [{max_len, State#state.max_queue_len}], diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 54d462e65..8123fe669 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -548,15 +548,15 @@ print({ClientId, _ClientPid, _Persistent, SessInfo}) -> deliver_msg, enqueue_msg, created_at], - ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight=~w, " + ?PRINT("Session(~s, clean_sess=~s, subscriptions=~w, max_inflight=~w, inflight=~w, " "mqueue_len=~w, mqueue_dropped=~w, awaiting_rel=~w, " "deliver_msg=~w, enqueue_msg=~w, created_at=~w)~n", [ClientId | [format(Key, get_value(Key, Data)) || Key <- InfoKeys]]). -print(subscription, {Sub, Topic}) when is_pid(Sub) -> - ?PRINT("~p -> ~s~n", [Sub, Topic]); print(subscription, {Sub, {_Share, Topic}}) when is_pid(Sub) -> ?PRINT("~p -> ~s~n", [Sub, Topic]); +print(subscription, {Sub, Topic}) when is_pid(Sub) -> + ?PRINT("~p -> ~s~n", [Sub, Topic]); print(subscription, {Sub, {_Share, Topic}}) -> ?PRINT("~s -> ~s~n", [Sub, Topic]); print(subscription, {Sub, Topic}) -> diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 692477447..25b7c3d23 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -31,7 +31,7 @@ handle_request(Req) -> handle_request(Req:get(method), Req:get(path), Req). -handle_request('GET', "/status", Req) -> +handle_request(Method, "/status", Req) when Method =:= 'HEAD'; Method =:= 'GET' -> {InternalStatus, _ProvidedStatus} = init:get_status(), AppStatus = case lists:keysearch(emqttd, 1, application:which_applications()) of diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index ba3c42036..02d23567f 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -737,16 +737,26 @@ await(Msg = #mqtt_message{pktid = PacketId}, acked(puback, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) -> - {publish, Msg, _Ts} = Inflight:lookup(PacketId), - emqttd_hooks:run('message.acked', [ClientId, Username], Msg), - State#state{inflight = Inflight:delete(PacketId)}; + case Inflight:lookup(PacketId) of + {publish, Msg, _Ts} -> + emqttd_hooks:run('message.acked', [ClientId, Username], Msg), + State#state{inflight = Inflight:delete(PacketId)}; + _ -> + ?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State), + State + end; acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) -> - {publish, Msg, _Ts} = Inflight:lookup(PacketId), - emqttd_hooks:run('message.acked', [ClientId, Username], Msg), - State#state{inflight = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()})}; + case Inflight:lookup(PacketId) of + {publish, Msg, _Ts} -> + emqttd_hooks:run('message.acked', [ClientId, Username], Msg), + State#state{inflight = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()})}; + {pubrel, PacketId, _Ts} -> + ?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State), + State + end; acked(pubcomp, PacketId, State = #state{inflight = Inflight}) -> State#state{inflight = Inflight:delete(PacketId)}.