From 39a48179ea29d672e4f3e5864e850b3019bb14cb Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 23 Aug 2023 12:09:40 +0300 Subject: [PATCH] chore(emqx_channel): use macros for reply construction --- apps/emqx/include/emqx_channel.hrl | 5 +++++ apps/emqx/src/emqx_channel.erl | 13 +++++++------ apps/emqx_ft/src/emqx_ft.erl | 6 ++++-- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/apps/emqx/include/emqx_channel.hrl b/apps/emqx/include/emqx_channel.hrl index be2448a20..53abcafd6 100644 --- a/apps/emqx/include/emqx_channel.hrl +++ b/apps/emqx/include/emqx_channel.hrl @@ -41,4 +41,9 @@ will_msg ]). +-define(REPLY_OUTGOING(Packets), {outgoing, Packets}). +-define(REPLY_CONNACK(Packet), {connack, Packet}). +-define(REPLY_EVENT(StateOrEvent), {event, StateOrEvent}). +-define(REPLY_CLOSE(Reason), {close, Reason}). + -define(EXPIRE_INTERVAL_INFINITE, 4294967295000). diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 8d0a58767..53cac2400 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -122,6 +122,7 @@ -type reply() :: {outgoing, emqx_types:packet()} | {outgoing, [emqx_types:packet()]} + | {connack, emqx_types:packet()} | {event, conn_state() | updated} | {close, Reason :: atom()}. @@ -1023,7 +1024,7 @@ handle_out(publish, [], Channel) -> {ok, Channel}; handle_out(publish, Publishes, Channel) -> {Packets, NChannel} = do_deliver(Publishes, Channel), - {ok, {outgoing, Packets}, NChannel}; + {ok, ?REPLY_OUTGOING(Packets), NChannel}; handle_out(puback, {PacketId, ReasonCode}, Channel) -> {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel}; handle_out(pubrec, {PacketId, ReasonCode}, Channel) -> @@ -1048,7 +1049,7 @@ handle_out(disconnect, {ReasonCode, ReasonName}, Channel) -> handle_out(disconnect, {ReasonCode, ReasonName, #{}}, Channel); handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) -> Packet = ?DISCONNECT_PACKET(ReasonCode, Props), - {ok, [{outgoing, Packet}, {close, ReasonName}], Channel}; + {ok, [?REPLY_OUTGOING(Packet), {close, ReasonName}], Channel}; handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) -> {ok, {close, ReasonName}, Channel}; handle_out(auth, {ReasonCode, Properties}, Channel) -> @@ -1062,7 +1063,7 @@ handle_out(Type, Data, Channel) -> %%-------------------------------------------------------------------- return_connack(AckPacket, Channel) -> - Replies = [{event, connected}, {connack, AckPacket}], + Replies = [?REPLY_EVENT(connected), ?REPLY_CONNACK(AckPacket)], case maybe_resume_session(Channel) of ignore -> {ok, Replies, Channel}; @@ -1073,7 +1074,7 @@ return_connack(AckPacket, Channel) -> session = NSession }, {Packets, NChannel2} = do_deliver(Publishes, NChannel1), - Outgoing = [{outgoing, Packets} || length(Packets) > 0], + Outgoing = [?REPLY_OUTGOING(Packets) || length(Packets) > 0], {ok, Replies ++ Outgoing, NChannel2} end. @@ -1121,7 +1122,7 @@ do_deliver(Publishes, Channel) when is_list(Publishes) -> %%-------------------------------------------------------------------- return_sub_unsub_ack(Packet, Channel) -> - {ok, [{outgoing, Packet}, {event, updated}], Channel}. + {ok, [?REPLY_OUTGOING(Packet), ?REPLY_EVENT(updated)], Channel}. %%-------------------------------------------------------------------- %% Handle call @@ -1235,7 +1236,7 @@ handle_info( -> Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of - {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; + {ok, Channel2} -> {ok, ?REPLY_EVENT(disconnected), Channel2}; Shutdown -> Shutdown end; handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 521d5b10a..6a98c51f0 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -18,7 +18,9 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx_channel.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). + -include_lib("snabbkaffe/include/trace.hrl"). -export([ @@ -164,7 +166,7 @@ on_channel_unregistered(ChannelPid) -> on_client_timeout(_TRef, ?FT_EVENT({MRef, PacketId}), Acc) -> _ = erlang:demonitor(MRef, [flush]), _ = emqx_ft_async_reply:take_by_mref(MRef), - {ok, [{outgoing, ?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)} | Acc]}; + {ok, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)) | Acc]}; on_client_timeout(_TRef, _Event, Acc) -> {ok, Acc}. @@ -172,7 +174,7 @@ on_process_down(MRef, _Pid, Reason, Acc) -> case emqx_ft_async_reply:take_by_mref(MRef) of {ok, PacketId, TRef} -> _ = emqx_utils:cancel_timer(TRef), - {ok, [{outgoing, ?PUBACK_PACKET(PacketId, reason_to_rc(Reason))} | Acc]}; + {ok, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, reason_to_rc(Reason))) | Acc]}; not_found -> {ok, Acc} end.