fix(gw-stomp): fix unknown outgoing packets

This commit is contained in:
JianBo He 2021-07-22 13:18:56 +08:00
parent 0f79ffca01
commit b28435f811
3 changed files with 9 additions and 7 deletions

View File

@ -49,7 +49,7 @@
| {event, conn_state() | updated} | {event, conn_state() | updated}
| {close, Reason :: atom()}. | {close, Reason :: atom()}.
-type replies() :: emqx_gateway_frame:packet() | reply() | [reply()]. -type replies() :: reply() | [reply()].
%% @doc Handle the incoming frame %% @doc Handle the incoming frame
-callback handle_in(emqx_gateway_frame:frame() | {frame_error, any()}, -callback handle_in(emqx_gateway_frame:frame() | {frame_error, any()},

View File

@ -87,7 +87,7 @@
| {event, conn_state()|updated} | {event, conn_state()|updated}
| {close, Reason :: atom()}). | {close, Reason :: atom()}).
-type(replies() :: emqx_sn_frame:packet() | reply() | [reply()]). -type(replies() :: reply() | [reply()]).
-define(TIMER_TABLE, #{ -define(TIMER_TABLE, #{
alive_timer => keepalive, alive_timer => keepalive,

View File

@ -82,7 +82,7 @@
| {event, conn_state()|updated} | {event, conn_state()|updated}
| {close, Reason :: atom()}). | {close, Reason :: atom()}).
-type(replies() :: stomp_frame() | reply() | [reply()]). -type(replies() :: reply() | [reply()]).
-define(TIMER_TABLE, #{ -define(TIMER_TABLE, #{
incoming_timer => keepalive, incoming_timer => keepalive,
@ -537,7 +537,7 @@ handle_out(connerr, {Headers, ReceiptId, ErrMsg}, Channel) ->
handle_out(error, {ReceiptId, ErrMsg}, Channel) -> handle_out(error, {ReceiptId, ErrMsg}, Channel) ->
Frame = error_frame(ReceiptId, ErrMsg), Frame = error_frame(ReceiptId, ErrMsg),
{ok, Frame, Channel}; {ok, {outgoing, Frame}, Channel};
handle_out(connected, Headers, Channel = #channel{ handle_out(connected, Headers, Channel = #channel{
ctx = Ctx, ctx = Ctx,
@ -554,7 +554,7 @@ handle_out(receipt, undefined, Channel) ->
{ok, Channel}; {ok, Channel};
handle_out(receipt, ReceiptId, Channel) -> handle_out(receipt, ReceiptId, Channel) ->
Frame = receipt_frame(ReceiptId), Frame = receipt_frame(ReceiptId),
{ok, Frame, Channel}. {ok, {outgoing, Frame}, Channel}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle call %% Handle call
@ -758,7 +758,7 @@ handle_timeout(_TRef, {keepalive_send, NewVal},
{error, timeout} -> {error, timeout} ->
NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt), NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt),
NChannel = Channel#channel{heartbeat = NHrtBt}, NChannel = Channel#channel{heartbeat = NHrtBt},
{ok, emqx_stomp_frame:make(heartbeat), {ok, {outgoing, emqx_stomp_frame:make(heartbeat)},
reset_timer(outgoing_timer, NChannel)}; reset_timer(outgoing_timer, NChannel)};
{ok, NHrtBt} -> {ok, NHrtBt} ->
{ok, reset_timer(outgoing_timer, {ok, reset_timer(outgoing_timer,
@ -897,7 +897,9 @@ add_action(TxId, Action, ReceiptId, Channel = #channel{transaction = Trans}) ->
NTrans = Trans#{TxId => {_StartedAt, [Action|Actions]}}, NTrans = Trans#{TxId => {_StartedAt, [Action|Actions]}},
{ok, Channel#channel{transaction = NTrans}}; {ok, Channel#channel{transaction = NTrans}};
_ -> _ ->
{ok, error_frame(ReceiptId, ["Transaction ", TxId, " not found"]), Channel} ErrFrame = error_frame(ReceiptId,
["Transaction ", TxId, " not found"]),
{ok, {outgoing, ErrFrame}, Channel}
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------