From cefff9c0eb4a06ef3a17e9934a2c7df1c6b4a992 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 27 Jul 2015 13:41:47 +0800 Subject: [PATCH 1/4] misc --- src/emqttd_session.erl | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 930454667..4a8a35933 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -354,13 +354,12 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> %% Dequeue pending messages noreply(dequeue(Session2)); -%% PUBRAC -handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) -> - case maps:find(PktId, Awaiting) of +%% PUBACK +handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck}) -> + case maps:find(PktId, AwaitingAck) of {ok, {_, TRef}} -> cancel_timer(TRef), - Session1 = acked(PktId, Session), - noreply(dequeue(Session1)); + noreply(dequeue(acked(PktId, Session))); error -> lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]), noreply(Session) @@ -375,7 +374,8 @@ handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId, {ok, {_, TRef}} -> cancel_timer(TRef), TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}), - Session1 = acked(PktId, Session#session{awaiting_comp = maps:put(PktId, TRef1, AwaitingComp)}), + AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp), + Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}), noreply(dequeue(Session1)); error -> lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]), @@ -439,9 +439,9 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = unde %% just remove awaiting noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); -handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId, +handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId, inflight_queue = InflightQ, - awaiting_ack = AwaitingAck}) -> + awaiting_ack = AwaitingAck}) -> case maps:find(PktId, AwaitingAck) of {ok, {{0, _Timeout}, _TRef}} -> Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), @@ -564,10 +564,11 @@ dequeue(Session) -> dequeue2(Session = #session{message_queue = Q}) -> case emqttd_mqueue:out(Q) of - {empty, _Q} -> Session; + {empty, _Q} -> + Session; {{value, Msg}, Q1} -> - Session1 = deliver(Msg, Session#session{message_queue = Q1}), - dequeue(Session1) %% dequeue more + %% dequeue more + dequeue(deliver(Msg, Session#session{message_queue = Q1})) end. deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) -> @@ -608,8 +609,8 @@ acked(PktId, Session = #session{client_id = ClientId, false -> lager:error("Session(~s) cannot find acked message: ~p", [PktId]) end, - Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), - awaiting_ack = maps:remove(PktId, Awaiting)}. + Session#session{awaiting_ack = maps:remove(PktId, Awaiting), + inflight_queue = lists:keydelete(PktId, 1, InflightQ)}. next_packet_id(Session = #session{packet_id = 16#ffff}) -> Session#session{packet_id = 1}; @@ -627,3 +628,4 @@ cancel_timer(Ref) -> noreply(State) -> {noreply, State, hibernate}. + From 0c5594bfcf207133961a90c7617deb38f420148e Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 27 Jul 2015 13:48:54 +0800 Subject: [PATCH 2/4] rm 'and comp --- src/emqttd_session.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 4a8a35933..f4f6d1c2d 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -110,7 +110,7 @@ %% Max Packets that Awaiting PUBREL max_awaiting_rel = 100, - %% Awaiting timers for ack, rel and comp. + %% Awaiting timers for ack, rel. awaiting_ack :: map(), %% Retries to resend the unacked messages From c276b6c6d15aeb52c09b19f3f010de648ec6a09f Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 28 Jul 2015 10:52:10 +0800 Subject: [PATCH 3/4] QOS_I --- src/emqttd_message.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_message.erl b/src/emqttd_message.erl index 8f3d4ed40..d8e55e202 100644 --- a/src/emqttd_message.erl +++ b/src/emqttd_message.erl @@ -59,7 +59,7 @@ make(From, Topic, Payload) -> Topic :: binary(), Payload :: binary(). make(From, Qos, Topic, Payload) -> - #mqtt_message{msgid = msgid(Qos), + #mqtt_message{msgid = msgid(?QOS_I(Qos)), topic = Topic, from = From, qos = ?QOS_I(Qos), From ca075f8f21c2fe76c97541172cbb167621d71e16 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 28 Jul 2015 11:05:50 +0800 Subject: [PATCH 4/4] pick_worker --- src/emqttd_pooler.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqttd_pooler.erl b/src/emqttd_pooler.erl index b284c240e..3f0245250 100644 --- a/src/emqttd_pooler.erl +++ b/src/emqttd_pooler.erl @@ -51,14 +51,14 @@ start_link(I) -> %% @end %%------------------------------------------------------------------------------ submit(Fun) -> - gen_server:call(gproc_pool:pick(pooler), {submit, Fun}, infinity). + gen_server:call(gproc_pool:pick_worker(pooler), {submit, Fun}, infinity). %%------------------------------------------------------------------------------ %% @doc Submit work to pooler asynchronously %% @end %%------------------------------------------------------------------------------ async_submit(Fun) -> - gen_server:cast(gproc_pool:pick(pooler), {async_submit, Fun}). + gen_server:cast(gproc_pool:pick_worker(pooler), {async_submit, Fun}). %%%============================================================================= %%% gen_server callbacks