Merge pull request #2769 from emqx/add-test-cases-for-emqx-protocol

Add test cases for emqx_protocol module
This commit is contained in:
Feng Lee 2019-08-09 11:38:37 +08:00 committed by GitHub
commit 84e3590b03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 198 additions and 47 deletions

View File

@ -37,9 +37,7 @@ load(Topics) ->
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
on_client_connected(#{client_id := ClientId,
username := Username,
conn_mod := ConnMod
}, ?RC_SUCCESS, _ConnAttrs, Topics) ->
username := Username}, ?RC_SUCCESS, _ConnAttrs, Topics) ->
Replace = fun(Topic) ->
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
end,

View File

@ -103,7 +103,7 @@ check_sub(Zone, Topic, SubOpts) ->
do_check_sub(Flags, Caps).
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit})
when Limit =/= 0 andalso Levels > Limit ->
when Limit > 0, Levels > Limit ->
{error, ?RC_TOPIC_FILTER_INVALID};
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) ->
{error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED};

View File

@ -30,6 +30,9 @@
, caps/1
]).
%% for tests
-export([set/3]).
-export([ init/2
, handle_in/2
, handle_req/2
@ -94,9 +97,17 @@ info(proto_ver, #protocol{proto_ver = ProtoVer}) ->
ProtoVer;
info(keepalive, #protocol{keepalive = Keepalive}) ->
Keepalive;
info(will_msg, #protocol{will_msg = WillMsg}) ->
WillMsg;
info(topic_aliases, #protocol{topic_aliases = Aliases}) ->
Aliases.
%% For tests
set(client, Client, PState) ->
PState#protocol{client = Client};
set(session, Session, PState) ->
PState#protocol{session = Session}.
attrs(#protocol{client = Client,
session = Session,
proto_name = ProtoName,
@ -112,6 +123,7 @@ attrs(#protocol{client = Client,
caps(#protocol{client = #{zone := Zone}}) ->
emqx_mqtt_caps:get_caps(Zone).
-spec(init(emqx_types:conn(), proplists:proplist()) -> proto_state()).
init(ConnInfo, Options) ->
Zone = proplists:get_value(zone, Options),
@ -195,16 +207,16 @@ handle_in(?PUBREC_PACKET(PacketId, ReasonCode), PState = #protocol{session = Ses
case emqx_session:pubrec(PacketId, ReasonCode, Session) of
{ok, NSession} ->
handle_out({pubrel, PacketId}, PState#protocol{session = NSession});
{error, ReasonCode} ->
handle_out({pubrel, PacketId, ReasonCode}, PState)
{error, ReasonCode1} ->
handle_out({pubrel, PacketId, ReasonCode1}, PState)
end;
handle_in(?PUBREL_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
case emqx_session:pubrel(PacketId, ReasonCode, Session) of
{ok, NSession} ->
handle_out({pubcomp, PacketId}, PState#protocol{session = NSession});
{error, ReasonCode} ->
handle_out({pubcomp, PacketId, ReasonCode}, PState)
{error, ReasonCode1} ->
handle_out({pubcomp, PacketId, ReasonCode1}, PState)
end;
handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->

View File

@ -53,6 +53,7 @@
-export([init/3]).
-export([ info/1
, info/2
, attrs/1
, stats/1
]).
@ -158,7 +159,7 @@ init(CleanStart, #{zone := Zone}, #{max_inflight := MaxInflight,
retry_interval = get_env(Zone, retry_interval, 0),
awaiting_rel = #{},
max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100),
await_rel_timeout = get_env(Zone, await_rel_timeout),
await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000),
expiry_interval = ExpiryInterval,
created_at = os:timestamp()
}.
@ -206,6 +207,39 @@ info(#session{clean_start = CleanStart,
created_at => CreatedAt
}.
info(clean_start, #session{clean_start = CleanStart}) ->
CleanStart;
info(subscriptions, #session{subscriptions = Subs}) ->
Subs;
info(max_subscriptions, #session{max_subscriptions = MaxSubs}) ->
MaxSubs;
info(upgrade_qos, #session{upgrade_qos = UpgradeQoS}) ->
UpgradeQoS;
info(inflight, #session{inflight = Inflight}) ->
emqx_inflight:size(Inflight);
info(max_inflight, #session{inflight = Inflight}) ->
emqx_inflight:max_size(Inflight);
info(retry_interval, #session{retry_interval = Interval}) ->
Interval;
info(mqueue_len, #session{mqueue = MQueue}) ->
emqx_mqueue:len(MQueue);
info(max_mqueue, #session{mqueue = MQueue}) ->
emqx_mqueue:max_len(MQueue);
info(mqueue_dropped, #session{mqueue = MQueue}) ->
emqx_mqueue:dropped(MQueue);
info(next_pkt_id, #session{next_pkt_id = PacketId}) ->
PacketId;
info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
maps:size(AwaitingRel);
info(max_awaiting_rel, #session{max_awaiting_rel = MaxAwaitingRel}) ->
MaxAwaitingRel;
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
Timeout;
info(expiry_interval, #session{expiry_interval = Interval}) ->
Interval div 1000;
info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt.
%%--------------------------------------------------------------------
%% Attrs of the session
%%--------------------------------------------------------------------
@ -343,7 +377,7 @@ puback(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
{value, {Msg, _Ts}} when is_record(Msg, message) ->
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
dequeue(Session#session{inflight = Inflight1});
false ->
none ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
ok = emqx_metrics:inc('packets.puback.missed'),
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
@ -615,7 +649,8 @@ expire_awaiting_rel([], _Now, Session) ->
{ok, Session#session{await_rel_timer = undefined}};
expire_awaiting_rel([{PacketId, Ts} | More], Now,
Session = #session{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
Session = #session{awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) ->
case (timer:now_diff(Now, Ts) div 1000) of
Age when Age >= Timeout ->
ok = emqx_metrics:inc('messages.qos2.expired'),

View File

@ -42,63 +42,155 @@ end_per_suite(_Config) ->
%% Test cases for handle_in
%%--------------------------------------------------------------------
t_handle_in_connect(_) ->
'TODO'.
t_handle_connect(_) ->
ConnPkt = #mqtt_packet_connect{
proto_name = <<"MQTT">>,
proto_ver = ?MQTT_PROTO_V4,
is_bridge = false,
clean_start = true,
keepalive = 30,
properties = #{},
client_id = <<"clientid">>,
username = <<"username">>,
password = <<"passwd">>
},
with_proto(
fun(PState) ->
{ok, ?CONNACK_PACKET(?RC_SUCCESS), PState1}
= handle_in(?CONNECT_PACKET(ConnPkt), PState),
Client = emqx_protocol:info(client, PState1),
?assertEqual(<<"clientid">>, maps:get(client_id, Client)),
?assertEqual(<<"username">>, maps:get(username, Client))
end).
t_handle_in_publish(_) ->
'TODO'.
t_handle_publish_qos0(_) ->
with_proto(
fun(PState) ->
Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
{ok, PState} = handle_in(Publish, PState)
end).
t_handle_in_puback(_) ->
'TODO'.
t_handle_publish_qos1(_) ->
with_proto(
fun(PState) ->
Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBACK_PACKET(1, RC), _} = handle_in(Publish, PState),
?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS))
end).
t_handle_in_pubrec(_) ->
'TODO'.
t_handle_publish_qos2(_) ->
with_proto(
fun(PState) ->
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBREC_PACKET(1, RC), PState1} = handle_in(Publish1, PState),
Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>),
{ok, ?PUBREC_PACKET(2, RC), PState2} = handle_in(Publish2, PState1),
?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)),
Session = emqx_protocol:info(session, PState2),
?assertEqual(2, emqx_session:info(awaiting_rel, Session))
end).
t_handle_in_pubrel(_) ->
'TODO'.
t_handle_puback(_) ->
with_proto(
fun(PState) ->
{ok, PState} = handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), PState)
end).
t_handle_in_pubcomp(_) ->
'TODO'.
t_handle_pubrec(_) ->
with_proto(
fun(PState) ->
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), PState}
= handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), PState)
end).
t_handle_in_subscribe(_) ->
'TODO'.
t_handle_pubrel(_) ->
with_proto(
fun(PState) ->
{ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), PState}
= handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), PState)
end).
t_handle_in_unsubscribe(_) ->
'TODO'.
t_handle_pubcomp(_) ->
with_proto(
fun(PState) ->
{ok, PState} = handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), PState)
end).
t_handle_in_pingreq(_) ->
with_proto(fun(PState) ->
t_handle_subscribe(_) ->
with_proto(
fun(PState) ->
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
{ok, ?SUBACK_PACKET(10, [?QOS_0]), PState1}
= handle_in(?SUBSCRIBE_PACKET(10, #{}, TopicFilters), PState),
Session = emqx_protocol:info(session, PState1),
?assertEqual(maps:from_list(TopicFilters),
emqx_session:info(subscriptions, Session))
end).
t_handle_unsubscribe(_) ->
with_proto(
fun(PState) ->
{ok, ?UNSUBACK_PACKET(11), PState}
= handle_in(?UNSUBSCRIBE_PACKET(11, #{}, [<<"+">>]), PState)
end).
t_handle_pingreq(_) ->
with_proto(
fun(PState) ->
{ok, ?PACKET(?PINGRESP), PState} = handle_in(?PACKET(?PINGREQ), PState)
end).
t_handle_in_disconnect(_) ->
'TODO'.
t_handle_disconnect(_) ->
with_proto(
fun(PState) ->
{stop, normal, PState1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), PState),
?assertEqual(undefined, emqx_protocol:info(will_msg, PState1))
end).
t_handle_in_auth(_) ->
'TODO'.
t_handle_auth(_) ->
with_proto(
fun(PState) ->
{ok, PState} = handle_in(?AUTH_PACKET(), PState)
end).
%%--------------------------------------------------------------------
%% Test cases for handle_deliver
%%--------------------------------------------------------------------
t_handle_deliver(_) ->
'TODO'.
with_proto(
fun(PState) ->
'TODO'
end).
%%--------------------------------------------------------------------
%% Test cases for handle_out
%%--------------------------------------------------------------------
t_handle_out_conack(_) ->
'TODO'.
t_handle_conack(_) ->
with_proto(
fun(PState) ->
'TODO'
end).
t_handle_out_publish(_) ->
'TODO'.
with_proto(
fun(PState) ->
'TODO'
end).
t_handle_out_puback(_) ->
'TODO'.
with_proto(
fun(PState) ->
'TODO'
end).
t_handle_out_pubrec(_) ->
'TODO'.
with_proto(
fun(PState) ->
'TODO'
end).
t_handle_out_pubrel(_) ->
'TODO'.
@ -123,22 +215,36 @@ t_handle_out_auth(_) ->
%%--------------------------------------------------------------------
t_handle_timeout(_) ->
'TODO'.
with_proto(
fun(PState) ->
'TODO'
end).
%%--------------------------------------------------------------------
%% Test cases for terminate
%%--------------------------------------------------------------------
t_terminate(_) ->
'TODO'.
with_proto(
fun(PState) ->
'TODO'
end).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
with_proto(Fun) ->
Fun(emqx_protocol:init(#{peername => {{127,0,0,1}, 3456},
ConnInfo = #{peername => {{127,0,0,1}, 3456},
sockname => {{127,0,0,1}, 1883},
conn_mod => emqx_channel},
#{zone => ?MODULE})).
client_id => <<"clientid">>,
username => <<"username">>
},
Options = [{zone, testing}],
PState = emqx_protocol:init(ConnInfo, Options),
Session = emqx_session:init(false, #{zone => testing},
#{max_inflight => 100,
expiry_interval => 0
}),
Fun(emqx_protocol:set(session, Session, PState)).