diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index b8909ff32..247fe7500 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -135,12 +135,11 @@ handle_cast(Msg, State) -> handle_info(timeout, State) -> stop({shutdown, timeout}, State); - -handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState, - conn_name = ConnName}) -> - lager:warning("Shutdown for duplicate clientid: ~s, conn:~s", - [emqttd_protocol:clientid(ProtoState), ConnName]), - stop({shutdown, duplicate_id}, State); + +%% Asynchronous SUBACK +handle_info({suback, PacketId, GrantedQos}, State = #state{proto_state = ProtoState}) -> + {ok, ProtoState1} = emqttd_protocol:send(?SUBACK_PACKET(PacketId, GrantedQos), ProtoState), + noreply(State#state{proto_state = ProtoState1}); handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), @@ -150,6 +149,12 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = Prot {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), noreply(State#state{proto_state = ProtoState1}); +handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState, + conn_name = ConnName}) -> + lager:warning("Shutdown for duplicate clientid: ~s, conn:~s", + [emqttd_protocol:clientid(ProtoState), ConnName]), + stop({shutdown, duplicate_id}, State); + handle_info(activate_sock, State) -> noreply(run_socket(State#state{conn_state = running})); diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index ce85ddab3..b4371f525 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -240,10 +240,7 @@ process(?SUBSCRIBE_PACKET(PacketId, TopicTable), lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State); false -> - AckFun = fun(GrantedQos) -> - send(?SUBACK_PACKET(PacketId, GrantedQos), State) - end, - emqttd_session:subscribe(Session, TopicTable, AckFun), {ok, State} + emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State} end; %% protect from empty topic list diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index f33e72521..12583fb5d 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -167,8 +167,12 @@ destroy(SessPid, ClientId) -> subscribe(SessPid, TopicTable) -> subscribe(SessPid, TopicTable, fun(_) -> ok end). --spec subscribe(pid(), [{binary(), mqtt_qos()}], AckFun :: fun()) -> ok. -subscribe(SessPid, TopicTable, AckFun) -> +-spec subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok. +subscribe(SessPid, PacketId, TopicTable) -> + From = self(), + AckFun = fun(GrantedQos) -> + From ! {suback, PacketId, GrantedQos} + end, gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}). %%------------------------------------------------------------------------------ @@ -298,13 +302,13 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{ case TopicTable -- Subscriptions of [] -> - catch AckFun([Qos || {_, Qos} <- TopicTable]), + AckFun([Qos || {_, Qos} <- TopicTable]), noreply(Session); _ -> %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), - catch AckFun(GrantedQos), + AckFun(GrantedQos), emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 4d72ce7bb..b4f82c043 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -164,6 +164,11 @@ handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) handle_cast(_Msg, State) -> {noreply, State}. +%% Asynchronous SUBACK +handle_info({suback, PacketId, GrantedQos}, State = #client_state{proto_state = ProtoState}) -> + {ok, ProtoState1} = emqttd_protocol:send(?SUBACK_PACKET(PacketId, GrantedQos), ProtoState), + noreply(State#client_state{proto_state = ProtoState1}); + handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), noreply(State#client_state{proto_state = ProtoState1});