Add test cases for emqx_protocol module

- Add emqx_session:info/2 for unit tests
- Add emqx_protocol:set/3 for unit tests
- Fix the `check_sub/3` of emqx_mqtt_caps
This commit is contained in:
Feng Lee 2019-08-09 11:33:52 +08:00
parent a2d5b834da
commit 916afc1c74
5 changed files with 198 additions and 47 deletions

View File

@ -37,9 +37,7 @@ load(Topics) ->
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}). emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
on_client_connected(#{client_id := ClientId, on_client_connected(#{client_id := ClientId,
username := Username, username := Username}, ?RC_SUCCESS, _ConnAttrs, Topics) ->
conn_mod := ConnMod
}, ?RC_SUCCESS, _ConnAttrs, Topics) ->
Replace = fun(Topic) -> Replace = fun(Topic) ->
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
end, end,

View File

@ -103,7 +103,7 @@ check_sub(Zone, Topic, SubOpts) ->
do_check_sub(Flags, Caps). do_check_sub(Flags, Caps).
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit})
when Levels > Limit -> when Limit > 0, Levels > Limit ->
{error, ?RC_TOPIC_FILTER_INVALID}; {error, ?RC_TOPIC_FILTER_INVALID};
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) ->
{error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED};

View File

@ -30,6 +30,9 @@
, caps/1 , caps/1
]). ]).
%% for tests
-export([set/3]).
-export([ init/2 -export([ init/2
, handle_in/2 , handle_in/2
, handle_req/2 , handle_req/2
@ -94,9 +97,17 @@ info(proto_ver, #protocol{proto_ver = ProtoVer}) ->
ProtoVer; ProtoVer;
info(keepalive, #protocol{keepalive = Keepalive}) -> info(keepalive, #protocol{keepalive = Keepalive}) ->
Keepalive; Keepalive;
info(will_msg, #protocol{will_msg = WillMsg}) ->
WillMsg;
info(topic_aliases, #protocol{topic_aliases = Aliases}) -> info(topic_aliases, #protocol{topic_aliases = Aliases}) ->
Aliases. Aliases.
%% For tests
set(client, Client, PState) ->
PState#protocol{client = Client};
set(session, Session, PState) ->
PState#protocol{session = Session}.
attrs(#protocol{client = Client, attrs(#protocol{client = Client,
session = Session, session = Session,
proto_name = ProtoName, proto_name = ProtoName,
@ -112,6 +123,7 @@ attrs(#protocol{client = Client,
caps(#protocol{client = #{zone := Zone}}) -> caps(#protocol{client = #{zone := Zone}}) ->
emqx_mqtt_caps:get_caps(Zone). emqx_mqtt_caps:get_caps(Zone).
-spec(init(emqx_types:conn(), proplists:proplist()) -> proto_state()). -spec(init(emqx_types:conn(), proplists:proplist()) -> proto_state()).
init(ConnInfo, Options) -> init(ConnInfo, Options) ->
Zone = proplists:get_value(zone, Options), Zone = proplists:get_value(zone, Options),
@ -195,16 +207,16 @@ handle_in(?PUBREC_PACKET(PacketId, ReasonCode), PState = #protocol{session = Ses
case emqx_session:pubrec(PacketId, ReasonCode, Session) of case emqx_session:pubrec(PacketId, ReasonCode, Session) of
{ok, NSession} -> {ok, NSession} ->
handle_out({pubrel, PacketId}, PState#protocol{session = NSession}); handle_out({pubrel, PacketId}, PState#protocol{session = NSession});
{error, ReasonCode} -> {error, ReasonCode1} ->
handle_out({pubrel, PacketId, ReasonCode}, PState) handle_out({pubrel, PacketId, ReasonCode1}, PState)
end; end;
handle_in(?PUBREL_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> handle_in(?PUBREL_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
case emqx_session:pubrel(PacketId, ReasonCode, Session) of case emqx_session:pubrel(PacketId, ReasonCode, Session) of
{ok, NSession} -> {ok, NSession} ->
handle_out({pubcomp, PacketId}, PState#protocol{session = NSession}); handle_out({pubcomp, PacketId}, PState#protocol{session = NSession});
{error, ReasonCode} -> {error, ReasonCode1} ->
handle_out({pubcomp, PacketId, ReasonCode}, PState) handle_out({pubcomp, PacketId, ReasonCode1}, PState)
end; end;
handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->

View File

@ -53,6 +53,7 @@
-export([init/3]). -export([init/3]).
-export([ info/1 -export([ info/1
, info/2
, attrs/1 , attrs/1
, stats/1 , stats/1
]). ]).
@ -158,7 +159,7 @@ init(CleanStart, #{zone := Zone}, #{max_inflight := MaxInflight,
retry_interval = get_env(Zone, retry_interval, 0), retry_interval = get_env(Zone, retry_interval, 0),
awaiting_rel = #{}, awaiting_rel = #{},
max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100),
await_rel_timeout = get_env(Zone, await_rel_timeout), await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000),
expiry_interval = ExpiryInterval, expiry_interval = ExpiryInterval,
created_at = os:timestamp() created_at = os:timestamp()
}. }.
@ -206,6 +207,39 @@ info(#session{clean_start = CleanStart,
created_at => CreatedAt created_at => CreatedAt
}. }.
info(clean_start, #session{clean_start = CleanStart}) ->
CleanStart;
info(subscriptions, #session{subscriptions = Subs}) ->
Subs;
info(max_subscriptions, #session{max_subscriptions = MaxSubs}) ->
MaxSubs;
info(upgrade_qos, #session{upgrade_qos = UpgradeQoS}) ->
UpgradeQoS;
info(inflight, #session{inflight = Inflight}) ->
emqx_inflight:size(Inflight);
info(max_inflight, #session{inflight = Inflight}) ->
emqx_inflight:max_size(Inflight);
info(retry_interval, #session{retry_interval = Interval}) ->
Interval;
info(mqueue_len, #session{mqueue = MQueue}) ->
emqx_mqueue:len(MQueue);
info(max_mqueue, #session{mqueue = MQueue}) ->
emqx_mqueue:max_len(MQueue);
info(mqueue_dropped, #session{mqueue = MQueue}) ->
emqx_mqueue:dropped(MQueue);
info(next_pkt_id, #session{next_pkt_id = PacketId}) ->
PacketId;
info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
maps:size(AwaitingRel);
info(max_awaiting_rel, #session{max_awaiting_rel = MaxAwaitingRel}) ->
MaxAwaitingRel;
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
Timeout;
info(expiry_interval, #session{expiry_interval = Interval}) ->
Interval div 1000;
info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Attrs of the session %% Attrs of the session
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -343,7 +377,7 @@ puback(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
{value, {Msg, _Ts}} when is_record(Msg, message) -> {value, {Msg, _Ts}} when is_record(Msg, message) ->
Inflight1 = emqx_inflight:delete(PacketId, Inflight), Inflight1 = emqx_inflight:delete(PacketId, Inflight),
dequeue(Session#session{inflight = Inflight1}); dequeue(Session#session{inflight = Inflight1});
false -> none ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
ok = emqx_metrics:inc('packets.puback.missed'), ok = emqx_metrics:inc('packets.puback.missed'),
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
@ -615,7 +649,8 @@ expire_awaiting_rel([], _Now, Session) ->
{ok, Session#session{await_rel_timer = undefined}}; {ok, Session#session{await_rel_timer = undefined}};
expire_awaiting_rel([{PacketId, Ts} | More], Now, expire_awaiting_rel([{PacketId, Ts} | More], Now,
Session = #session{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> Session = #session{awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) ->
case (timer:now_diff(Now, Ts) div 1000) of case (timer:now_diff(Now, Ts) div 1000) of
Age when Age >= Timeout -> Age when Age >= Timeout ->
ok = emqx_metrics:inc('messages.qos2.expired'), ok = emqx_metrics:inc('messages.qos2.expired'),

View File

@ -42,63 +42,155 @@ end_per_suite(_Config) ->
%% Test cases for handle_in %% Test cases for handle_in
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_handle_in_connect(_) -> t_handle_connect(_) ->
'TODO'. ConnPkt = #mqtt_packet_connect{
proto_name = <<"MQTT">>,
proto_ver = ?MQTT_PROTO_V4,
is_bridge = false,
clean_start = true,
keepalive = 30,
properties = #{},
client_id = <<"clientid">>,
username = <<"username">>,
password = <<"passwd">>
},
with_proto(
fun(PState) ->
{ok, ?CONNACK_PACKET(?RC_SUCCESS), PState1}
= handle_in(?CONNECT_PACKET(ConnPkt), PState),
Client = emqx_protocol:info(client, PState1),
?assertEqual(<<"clientid">>, maps:get(client_id, Client)),
?assertEqual(<<"username">>, maps:get(username, Client))
end).
t_handle_in_publish(_) -> t_handle_publish_qos0(_) ->
'TODO'. with_proto(
fun(PState) ->
Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
{ok, PState} = handle_in(Publish, PState)
end).
t_handle_in_puback(_) -> t_handle_publish_qos1(_) ->
'TODO'. with_proto(
fun(PState) ->
Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBACK_PACKET(1, RC), _} = handle_in(Publish, PState),
?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS))
end).
t_handle_in_pubrec(_) -> t_handle_publish_qos2(_) ->
'TODO'. with_proto(
fun(PState) ->
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBREC_PACKET(1, RC), PState1} = handle_in(Publish1, PState),
Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>),
{ok, ?PUBREC_PACKET(2, RC), PState2} = handle_in(Publish2, PState1),
?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)),
Session = emqx_protocol:info(session, PState2),
?assertEqual(2, emqx_session:info(awaiting_rel, Session))
end).
t_handle_in_pubrel(_) -> t_handle_puback(_) ->
'TODO'. with_proto(
fun(PState) ->
{ok, PState} = handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), PState)
end).
t_handle_in_pubcomp(_) -> t_handle_pubrec(_) ->
'TODO'. with_proto(
fun(PState) ->
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), PState}
= handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), PState)
end).
t_handle_in_subscribe(_) -> t_handle_pubrel(_) ->
'TODO'. with_proto(
fun(PState) ->
{ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), PState}
= handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), PState)
end).
t_handle_in_unsubscribe(_) -> t_handle_pubcomp(_) ->
'TODO'. with_proto(
fun(PState) ->
{ok, PState} = handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), PState)
end).
t_handle_in_pingreq(_) -> t_handle_subscribe(_) ->
with_proto(fun(PState) -> with_proto(
fun(PState) ->
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
{ok, ?SUBACK_PACKET(10, [?QOS_0]), PState1}
= handle_in(?SUBSCRIBE_PACKET(10, #{}, TopicFilters), PState),
Session = emqx_protocol:info(session, PState1),
?assertEqual(maps:from_list(TopicFilters),
emqx_session:info(subscriptions, Session))
end).
t_handle_unsubscribe(_) ->
with_proto(
fun(PState) ->
{ok, ?UNSUBACK_PACKET(11), PState}
= handle_in(?UNSUBSCRIBE_PACKET(11, #{}, [<<"+">>]), PState)
end).
t_handle_pingreq(_) ->
with_proto(
fun(PState) ->
{ok, ?PACKET(?PINGRESP), PState} = handle_in(?PACKET(?PINGREQ), PState) {ok, ?PACKET(?PINGRESP), PState} = handle_in(?PACKET(?PINGREQ), PState)
end). end).
t_handle_in_disconnect(_) -> t_handle_disconnect(_) ->
'TODO'. with_proto(
fun(PState) ->
{stop, normal, PState1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), PState),
?assertEqual(undefined, emqx_protocol:info(will_msg, PState1))
end).
t_handle_in_auth(_) -> t_handle_auth(_) ->
'TODO'. with_proto(
fun(PState) ->
{ok, PState} = handle_in(?AUTH_PACKET(), PState)
end).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases for handle_deliver %% Test cases for handle_deliver
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_handle_deliver(_) -> t_handle_deliver(_) ->
'TODO'. with_proto(
fun(PState) ->
'TODO'
end).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases for handle_out %% Test cases for handle_out
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_handle_out_conack(_) -> t_handle_conack(_) ->
'TODO'. with_proto(
fun(PState) ->
'TODO'
end).
t_handle_out_publish(_) -> t_handle_out_publish(_) ->
'TODO'. with_proto(
fun(PState) ->
'TODO'
end).
t_handle_out_puback(_) -> t_handle_out_puback(_) ->
'TODO'. with_proto(
fun(PState) ->
'TODO'
end).
t_handle_out_pubrec(_) -> t_handle_out_pubrec(_) ->
'TODO'. with_proto(
fun(PState) ->
'TODO'
end).
t_handle_out_pubrel(_) -> t_handle_out_pubrel(_) ->
'TODO'. 'TODO'.
@ -123,22 +215,36 @@ t_handle_out_auth(_) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_handle_timeout(_) -> t_handle_timeout(_) ->
'TODO'. with_proto(
fun(PState) ->
'TODO'
end).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases for terminate %% Test cases for terminate
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_terminate(_) -> t_terminate(_) ->
'TODO'. with_proto(
fun(PState) ->
'TODO'
end).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
with_proto(Fun) -> with_proto(Fun) ->
Fun(emqx_protocol:init(#{peername => {{127,0,0,1}, 3456}, ConnInfo = #{peername => {{127,0,0,1}, 3456},
sockname => {{127,0,0,1}, 1883}, sockname => {{127,0,0,1}, 1883},
conn_mod => emqx_channel}, client_id => <<"clientid">>,
#{zone => ?MODULE})). username => <<"username">>
},
Options = [{zone, testing}],
PState = emqx_protocol:init(ConnInfo, Options),
Session = emqx_session:init(false, #{zone => testing},
#{max_inflight => 100,
expiry_interval => 0
}),
Fun(emqx_protocol:set(session, Session, PState)).