From b28435f8114d683f949bc33fd0ec6776910aad39 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 22 Jul 2021 13:18:56 +0800 Subject: [PATCH] fix(gw-stomp): fix unknown outgoing packets --- apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl | 2 +- apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl | 2 +- apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl | 12 +++++++----- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl index be385e720..1a032a017 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl @@ -49,7 +49,7 @@ | {event, conn_state() | updated} | {close, Reason :: atom()}. --type replies() :: emqx_gateway_frame:packet() | reply() | [reply()]. +-type replies() :: reply() | [reply()]. %% @doc Handle the incoming frame -callback handle_in(emqx_gateway_frame:frame() | {frame_error, any()}, diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index bfa8f6fba..9d08fb8a1 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -87,7 +87,7 @@ | {event, conn_state()|updated} | {close, Reason :: atom()}). --type(replies() :: emqx_sn_frame:packet() | reply() | [reply()]). +-type(replies() :: reply() | [reply()]). -define(TIMER_TABLE, #{ alive_timer => keepalive, diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 9027c515e..db3e2ea5d 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -82,7 +82,7 @@ | {event, conn_state()|updated} | {close, Reason :: atom()}). --type(replies() :: stomp_frame() | reply() | [reply()]). +-type(replies() :: reply() | [reply()]). -define(TIMER_TABLE, #{ incoming_timer => keepalive, @@ -537,7 +537,7 @@ handle_out(connerr, {Headers, ReceiptId, ErrMsg}, Channel) -> handle_out(error, {ReceiptId, ErrMsg}, Channel) -> Frame = error_frame(ReceiptId, ErrMsg), - {ok, Frame, Channel}; + {ok, {outgoing, Frame}, Channel}; handle_out(connected, Headers, Channel = #channel{ ctx = Ctx, @@ -554,7 +554,7 @@ handle_out(receipt, undefined, Channel) -> {ok, Channel}; handle_out(receipt, ReceiptId, Channel) -> Frame = receipt_frame(ReceiptId), - {ok, Frame, Channel}. + {ok, {outgoing, Frame}, Channel}. %%-------------------------------------------------------------------- %% Handle call @@ -758,7 +758,7 @@ handle_timeout(_TRef, {keepalive_send, NewVal}, {error, timeout} -> NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt), NChannel = Channel#channel{heartbeat = NHrtBt}, - {ok, emqx_stomp_frame:make(heartbeat), + {ok, {outgoing, emqx_stomp_frame:make(heartbeat)}, reset_timer(outgoing_timer, NChannel)}; {ok, NHrtBt} -> {ok, reset_timer(outgoing_timer, @@ -897,7 +897,9 @@ add_action(TxId, Action, ReceiptId, Channel = #channel{transaction = Trans}) -> NTrans = Trans#{TxId => {_StartedAt, [Action|Actions]}}, {ok, Channel#channel{transaction = NTrans}}; _ -> - {ok, error_frame(ReceiptId, ["Transaction ", TxId, " not found"]), Channel} + ErrFrame = error_frame(ReceiptId, + ["Transaction ", TxId, " not found"]), + {ok, {outgoing, ErrFrame}, Channel} end. %%--------------------------------------------------------------------