diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index aa9a9f897..9c908c9bf 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -259,8 +259,7 @@ connected(enter, _PrevSt, State = #state{proto_state = ProtoState}) -> %% Ensure keepalive after connected successfully. Interval = emqx_protocol:info(keepalive, ProtoState), case ensure_keepalive(Interval, NState) of - ignore -> - keep_state(NState); + ignore -> keep_state(NState); {ok, KeepAlive} -> keep_state(NState#state{keepalive = KeepAlive}); {error, Reason} -> diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index 3c04e4cea..1c21c29d8 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -270,7 +270,7 @@ websocket_info(Deliver = {deliver, _Topic, _Msg}, {ok, NProtoState} -> reply(State#state{proto_state = NProtoState}); {ok, Packets, NProtoState} -> - reply(Packets, State#state{proto_state = NProtoState}); + reply(enqueue(Packets, State#state{proto_state = NProtoState})); {error, Reason} -> stop(Reason, State); {error, Reason, NProtoState} -> @@ -282,7 +282,6 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> {ok, KeepAlive1} -> {ok, State#state{keepalive = KeepAlive1}}; {error, timeout} -> - ?LOG(debug, "Keepalive Timeout!"), stop(keepalive_timeout, State); {error, Error} -> ?LOG(error, "Keepalive error: ~p", [Error]), @@ -315,7 +314,7 @@ websocket_info({timeout, Timer, Msg}, {ok, NProtoState} -> {ok, State#state{proto_state = NProtoState}}; {ok, Packets, NProtoState} -> - reply(Packets, State#state{proto_state = NProtoState}); + reply(enqueue(Packets, State#state{proto_state = NProtoState})); {error, Reason} -> stop(Reason, State); {error, Reason, NProtoState} -> @@ -364,8 +363,7 @@ connected(State = #state{proto_state = ProtoState}) -> %% Ensure keepalive after connected successfully. Interval = emqx_protocol:info(keepalive, ProtoState), case ensure_keepalive(Interval, NState) of - ignore -> - reply(NState); + ignore -> reply(NState); {ok, KeepAlive} -> reply(NState#state{keepalive = KeepAlive}); {error, Reason} -> @@ -377,16 +375,10 @@ connected(State = #state{proto_state = ProtoState}) -> ensure_keepalive(0, _State) -> ignore; -ensure_keepalive(Interval, State = #state{proto_state = ProtoState}) -> +ensure_keepalive(Interval, #state{proto_state = ProtoState}) -> Backoff = emqx_zone:get_env(emqx_protocol:info(zone, ProtoState), keepalive_backoff, 0.75), - case emqx_keepalive:start(stat_fun(), round(Interval * Backoff), {keepalive, check}) of - {ok, KeepAlive} -> - {ok, State#state{keepalive = KeepAlive}}; - {error, Reason} -> - ?LOG(warning, "Keepalive error: ~p", [Reason]), - stop(Reason, State) - end. + emqx_keepalive:start(stat_fun(), round(Interval * Backoff), {keepalive, check}). %%-------------------------------------------------------------------- %% Process incoming data @@ -415,8 +407,7 @@ process_incoming(Data, State = #state{parse_state = ParseState}) -> %% Handle incoming packets handle_incoming(Packet = ?PACKET(Type), SuccFun, - State = #state{proto_state = ProtoState, - pendings = Pendings}) -> + State = #state{proto_state = ProtoState}) -> _ = inc_incoming_stats(Type), ok = emqx_metrics:inc_recv(Packet), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), @@ -424,9 +415,7 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun, {ok, NProtoState} -> SuccFun(State#state{proto_state = NProtoState}); {ok, OutPackets, NProtoState} -> - Pendings1 = lists:append(Pendings, OutPackets), - SuccFun(State#state{proto_state = NProtoState, - pendings = Pendings1}); + SuccFun(enqueue(OutPackets, State#state{proto_state = NProtoState})); {error, Reason, NProtoState} -> stop(Reason, State#state{proto_state = NProtoState}); {stop, Error, NProtoState} -> @@ -436,9 +425,6 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun, %%-------------------------------------------------------------------- %% Handle outgoing packets -handle_outgoing(Packet, State) when is_tuple(Packet) -> - handle_outgoing([Packet], State); - handle_outgoing(Packets, #state{serialize = Serialize}) -> Data = lists:map(Serialize, Packets), emqx_pd:update_counter(send_oct, iolist_size(Data)), @@ -471,10 +457,6 @@ inc_outgoing_stats(Type) -> %%-------------------------------------------------------------------- %% Reply or Stop -reply(Packets, State = #state{pendings = Pendings}) -> - Pendings1 = lists:append(Pendings, Packets), - reply(State#state{pendings = Pendings1}). - reply(State = #state{pendings = []}) -> {ok, State}; reply(State = #state{pendings = Pendings}) -> @@ -488,6 +470,11 @@ stop(Reason, State = #state{pendings = Pendings}) -> {reply, [Reply, close], State#state{pendings = [], reason = Reason}}. +enqueue(Packet, State) when is_record(Packet, mqtt_packet) -> + enqueue([Packet], State); +enqueue(Packets, State = #state{pendings = Pendings}) -> + State#state{pendings = lists:append(Pendings, Packets)}. + %%-------------------------------------------------------------------- %% Ensure stats timer