refactor(mqttsn): avoid saving anonymous functions

This commit is contained in:
JianBo He 2020-12-10 14:44:35 +08:00 committed by JianBo He
parent 411e2c0022
commit 27d6b73c37
3 changed files with 33 additions and 37 deletions

View File

@ -87,8 +87,7 @@
enable_stats :: boolean(),
stats_timer :: maybe(reference()),
idle_timeout :: integer(),
enable_qos3 = false :: boolean(),
transform :: fun((emqx_types:packet(), #state{}) -> tuple())
enable_qos3 = false :: boolean()
}).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]).
@ -159,8 +158,7 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
asleep_msg_queue = [],
enable_stats = EnableStats,
enable_qos3 = EnableQos3,
idle_timeout = IdleTimeout,
transform = transform_fun()
idle_timeout = IdleTimeout
},
{ok, idle, State, [IdleTimeout]};
{error, Reason} when Reason =:= enotconn;
@ -691,13 +689,25 @@ call(Pid, Req) ->
%% Internal Functions
%%--------------------------------------------------------------------
transform(?CONNACK_PACKET(0, _SessPresent), _FuncMsgIdToTopicId, _State) ->
mqtt2sn(?CONNACK_PACKET(0, _SessPresent), _State) ->
?SN_CONNACK_MSG(0);
transform(?CONNACK_PACKET(_ReturnCode, _SessPresent), _FuncMsgIdToTopicId, _State) ->
mqtt2sn(?CONNACK_PACKET(_ReturnCode, _SessPresent), _State) ->
?SN_CONNACK_MSG(?SN_RC_CONGESTION);
transform(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), _FuncMsgIdToTopicId, #state{registry = Registry}) ->
mqtt2sn(?PUBREC_PACKET(MsgId), _State) ->
?SN_PUBREC_MSG(?SN_PUBREC, MsgId);
mqtt2sn(?PUBREL_PACKET(MsgId), _State) ->
?SN_PUBREC_MSG(?SN_PUBREL, MsgId);
mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) ->
?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId);
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
?SN_UNSUBACK_MSG(MsgId);
mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{registry = Registry}) ->
NewPacketId = if
QoS =:= ?QOS_0 -> 0;
true -> PacketId
@ -715,35 +725,23 @@ transform(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), _FuncMsgIdToTopicId, #
Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType},
?SN_PUBLISH_MSG(Flags, TopicContent, NewPacketId, Payload);
transform(?PUBACK_PACKET(MsgId, _ReasonCode), FuncMsgIdToTopicId, _State) ->
TopicIdFinal = get_topic_id(puback, MsgId, FuncMsgIdToTopicId),
?SN_PUBACK_MSG(TopicIdFinal, MsgId, ?SN_RC_ACCEPTED);
transform(?PUBREC_PACKET(MsgId), _FuncMsgIdToTopicId, _State) ->
?SN_PUBREC_MSG(?SN_PUBREC, MsgId);
transform(?PUBREL_PACKET(MsgId), _FuncMsgIdToTopicId, _State) ->
?SN_PUBREC_MSG(?SN_PUBREL, MsgId);
transform(?PUBCOMP_PACKET(MsgId), _FuncMsgIdToTopicId, _State) ->
?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId);
transform(?SUBACK_PACKET(MsgId, ReturnCodes), FuncMsgIdToTopicId, _State)->
mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)->
% if success, suback is sent by handle_info({suback, MsgId, [GrantedQoS]}, ...)
% if failure, suback is sent in this function.
[ReturnCode | _ ] = ReturnCodes,
{QoS, TopicId, NewReturnCode}
= case ?IS_QOS(ReturnCode) of
true ->
{ReturnCode, get_topic_id(suback, MsgId, FuncMsgIdToTopicId), ?SN_RC_ACCEPTED};
{ReturnCode, get_topic_id(suback, MsgId), ?SN_RC_ACCEPTED};
_ ->
{?QOS_0, get_topic_id(suback, MsgId, FuncMsgIdToTopicId), ?SN_RC_NOT_SUPPORTED}
{?QOS_0, get_topic_id(suback, MsgId), ?SN_RC_NOT_SUPPORTED}
end,
Flags = #mqtt_sn_flags{qos = QoS},
?SN_SUBACK_MSG(Flags, TopicId, MsgId, NewReturnCode);
transform(?UNSUBACK_PACKET(MsgId), _FuncMsgIdToTopicId, _State)->
?SN_UNSUBACK_MSG(MsgId).
mqtt2sn(?PUBACK_PACKET(MsgId, _ReasonCode), _State) ->
TopicIdFinal = get_topic_id(puback, MsgId),
?SN_PUBACK_MSG(TopicIdFinal, MsgId, ?SN_RC_ACCEPTED).
send_register(TopicName, TopicId, MsgId, State) ->
send_message(?SN_REGISTER_MSG(TopicId, MsgId, TopicName), State).
@ -1033,8 +1031,8 @@ get_corrected_qos(?QOS_NEG1, State) ->
get_corrected_qos(QoS, _State) ->
QoS.
get_topic_id(Type, MsgId, Func) ->
case Func(Type, MsgId) of
get_topic_id(Type, MsgId) ->
case dequeue_msgid(Type, MsgId) of
undefined -> 0;
TopicId -> TopicId
end.
@ -1050,7 +1048,7 @@ handle_outgoing(Packets, State) when is_list(Packets) ->
lists:foreach(fun(Packet) -> handle_outgoing(Packet, State) end, Packets);
handle_outgoing(PubPkt = ?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload),
State = #state{clientid = ClientId, registry = Registry, transform = Transform}) ->
State = #state{clientid = ClientId, registry = Registry}) ->
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
MsgId = message_id(PacketId),
?LOG(debug, "Handle outgoing: ~p", [PubPkt], State),
@ -1059,11 +1057,11 @@ handle_outgoing(PubPkt = ?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload),
andalso (byte_size(TopicName) =/= 2)
andalso register_and_notify_client(TopicName, Payload, Dup, QoS,
Retain, MsgId, ClientId, State),
send_message(Transform(PubPkt, State), State);
send_message(mqtt2sn(PubPkt, State), State);
handle_outgoing(Packet, State = #state{transform = Transform}) ->
send_message(Transform(Packet, State), State).
handle_outgoing(Packet, State) ->
send_message(mqtt2sn(Packet, State), State).
register_and_notify_client(TopicName, Payload, Dup, QoS, Retain, MsgId, ClientId,
State = #state{registry = Registry}) ->
@ -1076,10 +1074,6 @@ message_id(undefined) ->
rand:uniform(16#FFFF);
message_id(MsgId) -> MsgId.
transform_fun() ->
FunMsgIdToTopicId = fun(Type, MsgId) -> dequeue_msgid(Type, MsgId) end,
fun(Packet, State) -> transform(Packet, FunMsgIdToTopicId, State) end.
inc_incoming_stats(Type) ->
inc_counter(recv_pkt, 1),
case Type == ?PUBLISH of

View File

@ -16,7 +16,8 @@
-module(emqx_sn_proper_types).
-include("emqx_sn.hrl").
%-include("emqx_sn.hrl").
-include_lib("emqx_sn/include/emqx_sn.hrl").
-include_lib("proper/include/proper.hrl").
-compile({no_auto_import, [register/1]}).

View File

@ -16,7 +16,8 @@
-module(prop_emqx_sn_frame).
-include("emqx_sn.hrl").
%-include("emqx_sn.hrl").
-include_lib("emqx_sn/include/emqx_sn.hrl").
-include_lib("proper/include/proper.hrl").
-compile({no_auto_import, [register/1]}).