diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 08a3cff2a..2275164a9 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -743,12 +743,12 @@ waiting_for_connack(EventType, EventContent, State) -> false -> {stop, connack_timeout} end. -connected({call, From}, subscriptions, State = #state{subscriptions = Subscriptions}) -> - {keep_state, State, [{reply, From, maps:to_list(Subscriptions)}]}; +connected({call, From}, subscriptions, #state{subscriptions = Subscriptions}) -> + {keep_state_and_data, [{reply, From, maps:to_list(Subscriptions)}]}; connected({call, From}, info, State) -> Info = lists:zip(record_info(fields, state), tl(tuple_to_list(State))), - {keep_state, State, [{reply, From, Info}]}; + {keep_state_and_data, [{reply, From, Info}]}; connected({call, From}, pause, State) -> {keep_state, State#state{paused = true}, [{reply, From, ok}]}; @@ -756,11 +756,11 @@ connected({call, From}, pause, State) -> connected({call, From}, resume, State) -> {keep_state, State#state{paused = false}, [{reply, From, ok}]}; -connected({call, From}, get_properties, State = #state{properties = Properties}) -> - {keep_state, State, [{reply, From, Properties}]}; +connected({call, From}, get_properties, #state{properties = Properties}) -> + {keep_state_and_data, [{reply, From, Properties}]}; -connected({call, From}, client_id, State = #state{client_id = ClientId}) -> - {keep_state, State, [{reply, From, ClientId}]}; +connected({call, From}, client_id, #state{client_id = ClientId}) -> + {keep_state_and_data, [{reply, From, ClientId}]}; connected({call, From}, {set_request_handler, RequestHandler}, State) -> {keep_state, State#state{request_handler = RequestHandler}, [{reply, From, ok}]}; @@ -843,8 +843,8 @@ connected(cast, {pubrel, PacketId, ReasonCode, Properties}, State) -> connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) -> send_puback(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State); -connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), State = #state{paused = true}) -> - {keep_state, State}; +connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), #state{paused = true}) -> + keep_state_and_data; connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, Properties, Payload), State) when Properties =/= undefined -> @@ -897,7 +897,7 @@ connected(cast, ?PUBREL_PACKET(PacketId), end; error -> emqx_logger:warning("Unexpected PUBREL: ~p", [PacketId]), - {keep_state, State} + keep_state_and_data end; connected(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) -> @@ -910,7 +910,8 @@ connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes), %%TODO: Merge reason codes to subscriptions? Reply = {ok, Properties, ReasonCodes}, {keep_state, NewState, [{reply, From, Reply}]}; - false -> {keep_state, State} + false -> + keep_state_and_data end; connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), @@ -923,16 +924,18 @@ connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), end, Subscriptions, Topics), {keep_state, NewState#state{subscriptions = Subscriptions1}, [{reply, From, {ok, Properties, ReasonCodes}}]}; - false -> {keep_state, State} + false -> + keep_state_and_data end; -connected(cast, ?PACKET(?PINGRESP), State = #state{pending_calls = []}) -> - {keep_state, State}; +connected(cast, ?PACKET(?PINGRESP), #state{pending_calls = []}) -> + keep_state_and_data; connected(cast, ?PACKET(?PINGRESP), State) -> case take_call(ping, State) of {value, #call{from = From}, NewState} -> {keep_state, NewState, [{reply, From, pong}]}; - false -> {keep_state, State} + false -> + keep_state_and_data end; connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) -> @@ -1009,17 +1012,17 @@ handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) -> emqx_logger:debug("[~p] Got EXIT from owner, Reason: ~p", [?MODULE, Reason]), {stop, {shutdown, Reason}, State}; -handle_event(info, {inet_reply, _Sock, ok}, _, State) -> - {keep_state, State}; +handle_event(info, {inet_reply, _Sock, ok}, _, _State) -> + keep_state_and_data; handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> emqx_logger:error("[~p] got tcp error: ~p", [?MODULE, Reason]), {stop, {shutdown, Reason}, State}; -handle_event(EventType, EventContent, StateName, StateData) -> +handle_event(EventType, EventContent, StateName, _StateData) -> emqx_logger:error("State: ~s, Unexpected Event: (~p, ~p)", [StateName, EventType, EventContent]), - {keep_state, StateData}. + keep_state_and_data. %% Mandatory callback functions terminate(Reason, _StateName, State = #state{socket = Socket}) ->