diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 49f840362..40ae11de6 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -193,8 +193,8 @@ handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState) -> puback(QoS, PacketId, ReasonCode, NPState) end; -handle_in(?PUBACK_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> - case emqx_session:puback(PacketId, ReasonCode, Session) of +handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), PState = #protocol{session = Session}) -> + case emqx_session:puback(PacketId, Session) of {ok, Publishes, NSession} -> handle_out({publish, Publishes}, PState#protocol{session = NSession}); {ok, NSession} -> @@ -204,7 +204,7 @@ handle_in(?PUBACK_PACKET(PacketId, ReasonCode), PState = #protocol{session = Ses end; handle_in(?PUBREC_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> - case emqx_session:pubrec(PacketId, ReasonCode, Session) of + case emqx_session:pubrec(PacketId, Session) of {ok, NSession} -> handle_out({pubrel, PacketId}, PState#protocol{session = NSession}); {error, ReasonCode1} -> @@ -212,15 +212,15 @@ handle_in(?PUBREC_PACKET(PacketId, ReasonCode), PState = #protocol{session = Ses end; handle_in(?PUBREL_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> - case emqx_session:pubrel(PacketId, ReasonCode, Session) of + case emqx_session:pubrel(PacketId, Session) of {ok, NSession} -> handle_out({pubcomp, PacketId}, PState#protocol{session = NSession}); {error, ReasonCode1} -> handle_out({pubcomp, PacketId, ReasonCode1}, PState) end; -handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> - case emqx_session:pubcomp(PacketId, ReasonCode, Session) of +handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), PState = #protocol{session = Session}) -> + case emqx_session:pubcomp(PacketId, Session) of {ok, Publishes, NSession} -> handle_out({publish, Publishes}, PState#protocol{session = NSession}); {ok, NSession} -> @@ -905,4 +905,3 @@ sp(false) -> 0. flag(true) -> 1; flag(false) -> 0. - diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 5dfcfe8d2..33c396dfb 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -63,10 +63,10 @@ ]). -export([ publish/3 - , puback/3 - , pubrec/3 - , pubrel/3 - , pubcomp/3 + , puback/2 + , pubrec/2 + , pubrel/2 + , pubcomp/2 ]). -export([deliver/2]). @@ -80,6 +80,9 @@ , get_env/3 ]). +%% For test case +-export([set_pkt_id/2]). + -record(session, { %% Clean Start Flag clean_start :: boolean(), @@ -167,8 +170,8 @@ init(CleanStart, #{zone := Zone}, #{max_inflight := MaxInflight, init_mqueue(Zone) -> emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000), store_qos0 => get_env(Zone, mqueue_store_qos0, true), - priorities => get_env(Zone, mqueue_priorities), - default_priority => get_env(Zone, mqueue_default_priority) + priorities => get_env(Zone, mqueue_priorities, none), + default_priority => get_env(Zone, mqueue_default_priority, lowest) }). %%-------------------------------------------------------------------- @@ -369,14 +372,17 @@ do_publish(PacketId, Msg = #message{timestamp = Ts}, %% Client -> Broker: PUBACK %%-------------------------------------------------------------------- --spec(puback(emqx_types:packet_id(), emqx_types:reason_code(), session()) +-spec(puback(emqx_types:packet_id(), session()) -> {ok, session()} | {ok, list(publish()), session()} | {error, emqx_types:reason_code()}). -puback(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) -> +puback(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), dequeue(Session#session{inflight = Inflight1}); + {value, {_OtherPub, _Ts}} -> + ?LOG(warning, "The PacketId has been used, PacketId: ~p", [PacketId]), + {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.puback.missed'), @@ -387,9 +393,9 @@ puback(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) -> %% Client -> Broker: PUBREC %%-------------------------------------------------------------------- --spec(pubrec(emqx_types:packet_id(), emqx_types:reason_code(), session()) +-spec(pubrec(emqx_types:packet_id(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). -pubrec(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) -> +pubrec(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> Inflight1 = emqx_inflight:update(PacketId, {pubrel, os:timestamp()}, Inflight), @@ -408,9 +414,9 @@ pubrec(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) -> %% Client -> Broker: PUBREL %%-------------------------------------------------------------------- --spec(pubrel(emqx_types:packet_id(), emqx_types:reason_code(), session()) +-spec(pubrel(emqx_types:packet_id(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). -pubrel(PacketId, _ReasonCode, Session = #session{awaiting_rel = AwaitingRel}) -> +pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> case maps:take(PacketId, AwaitingRel) of {_Ts, AwaitingRel1} -> {ok, Session#session{awaiting_rel = AwaitingRel1}}; @@ -424,10 +430,10 @@ pubrel(PacketId, _ReasonCode, Session = #session{awaiting_rel = AwaitingRel}) -> %% Client -> Broker: PUBCOMP %%-------------------------------------------------------------------- --spec(pubcomp(emqx_types:packet_id(), emqx_types:reason_code(), session()) +-spec(pubcomp(emqx_types:packet_id(), session()) -> {ok, session()} | {ok, list(publish()), session()} | {error, emqx_types:reason_code()}). -pubcomp(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) -> +pubcomp(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:contain(PacketId, Inflight) of true -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), @@ -658,7 +664,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now, Session1 = Session#session{awaiting_rel = maps:remove(PacketId, AwaitingRel)}, expire_awaiting_rel(More, Now, Session1); Age -> - ensure_await_rel_timer(Timeout - max(0, Age), Session) + {ok, ensure_await_rel_timer(Timeout - max(0, Age), Session)} end. %%-------------------------------------------------------------------- @@ -671,3 +677,10 @@ next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) -> next_pkt_id(Session = #session{next_pkt_id = Id}) -> Session#session{next_pkt_id = Id + 1}. +%%--------------------------------------------------------------------- +%% For Test case +%%--------------------------------------------------------------------- + + +set_pkt_id(Session, PktId) -> + Session#session{next_pkt_id = PktId}. diff --git a/test/prop_emqx_session.erl b/test/prop_emqx_session.erl new file mode 100644 index 000000000..5e137ee12 --- /dev/null +++ b/test/prop_emqx_session.erl @@ -0,0 +1,327 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_emqx_session). + +-include("emqx_mqtt.hrl"). +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(mock_modules, + [ emqx_metrics + , emqx_broker + , emqx_misc + , emqx_message + , emqx_hooks + , emqx_zone + , emqx_pd + ]). + +-compile(export_all). +-compile(nowarn_export_all). + +%%%%%%%%%%%%%%%%%% +%%% Properties %%% +%%%%%%%%%%%%%%%%%% +prop_session_pub(opts) -> [{numtests, 1000}]. + +prop_session_pub() -> + emqx_logger:set_log_level(emergency), + + ?SETUP(fun() -> + ok = load(?mock_modules), + fun() -> ok = unload(?mock_modules) end + end, + ?FORALL({Session, OpList}, {session(), session_op_list()}, + begin + try + apply_ops(Session, OpList), + true + after + ok + end + end)). + +%%%%%%%%%%%%%%% +%%% Helpers %%% +%%%%%%%%%%%%%%% + +apply_ops(Session, []) -> + ?assertEqual(session, element(1, Session)); +apply_ops(Session, [Op | Rest]) -> + NSession = apply_op(Session, Op), + apply_ops(NSession, Rest). + +apply_op(Session, info) -> + Info = emqx_session:info(Session), + ?assert(is_map(Info)), + ?assertEqual(16, maps:size(Info)), + Session; +apply_op(Session, attrs) -> + Attrs = emqx_session:attrs(Session), + ?assert(is_map(Attrs)), + ?assertEqual(3, maps:size(Attrs)), + Session; +apply_op(Session, stats) -> + Stats = emqx_session:stats(Session), + ?assert(is_list(Stats)), + ?assertEqual(9, length(Stats)), + Session; +apply_op(Session, {subscribe, {Client, TopicFilter, SubOpts}}) -> + case emqx_session:subscribe(Client, TopicFilter, SubOpts, Session) of + {ok, NSession} -> + NSession; + {error, ?RC_QUOTA_EXCEEDED} -> + Session + end; +apply_op(Session, {unsubscribe, {Client, TopicFilter}}) -> + case emqx_session:unsubscribe(Client, TopicFilter, Session) of + {ok, NSession} -> + NSession; + {error, ?RC_NO_SUBSCRIPTION_EXISTED} -> + Session + end; +apply_op(Session, {publish, {PacketId, Msg}}) -> + case emqx_session:publish(PacketId, Msg, Session) of + {ok, _Msg} -> + Session; + {ok, _Deliver, NSession} -> + NSession; + {error, _ErrorCode} -> + Session + end; +apply_op(Session, {puback, PacketId}) -> + case emqx_session:puback(PacketId, Session) of + {ok, _Msg} -> + Session; + {ok, _Deliver, NSession} -> + NSession; + {error, _ErrorCode} -> + Session + end; +apply_op(Session, {pubrec, PacketId}) -> + case emqx_session:pubrec(PacketId, Session) of + {ok, NSession} -> + NSession; + {error, _ErrorCode} -> + Session + end; +apply_op(Session, {pubrel, PacketId}) -> + case emqx_session:pubrel(PacketId, Session) of + {ok, NSession} -> + NSession; + {error, _ErrorCode} -> + Session + end; +apply_op(Session, {pubcomp, PacketId}) -> + case emqx_session:pubcomp(PacketId, Session) of + {ok, _Msgs} -> + Session; + {ok, _Msgs, NSession} -> + NSession; + {error, _ErrorCode} -> + Session + end; +apply_op(Session, {deliver, Delivers}) -> + {ok, _Msgs, NSession} = emqx_session:deliver(Delivers, Session), + NSession; +apply_op(Session, {timeout, {TRef, TimeoutMsg}}) -> + case emqx_session:timeout(TRef, TimeoutMsg, Session) of + {ok, NSession} -> + NSession; + {ok, _Msg, NSession} -> + NSession + end. + +%%%%%%%%%%%%%%%%%% +%%% Generators %%% +%%%%%%%%%%%%%%%%%% +session_op_list() -> + Union = [info, + attrs, + stats, + {subscribe, sub_args()}, + {unsubscribe, unsub_args()}, + {publish, publish_args()}, + {puback, puback_args()}, + {pubrec, pubrec_args()}, + {pubrel, pubrel_args()}, + {pubcomp, pubcomp_args()}, + {deliver, deliver_args()}, + {timeout, timeout_args()} + ], + list(?LAZY(oneof(Union))). + +deliver_args() -> + list({deliver, topic(), message()}). + +timeout_args() -> + {tref(), timeout_msg()}. + +sub_args() -> + ?LET({ClientId, TopicFilter, SubOpts}, + {clientid(), topic(), sub_opts()}, + {#{client_id => ClientId}, TopicFilter, SubOpts}). + +unsub_args() -> + ?LET({ClientId, TopicFilter}, + {clientid(), topic()}, + {#{client_id => ClientId}, TopicFilter}). + +publish_args() -> + ?LET({PacketId, Message}, + {packetid(), message()}, + {PacketId, Message}). + +puback_args() -> + packetid(). + +pubrec_args() -> + packetid(). + +pubrel_args() -> + packetid(). + +pubcomp_args() -> + packetid(). + +timeout_msg() -> + oneof([retry_delivery, check_awaiting_rel]). + +tref() -> oneof([tref, undefined]). + +sub_opts() -> + ?LET({RH, RAP, NL, QOS, SHARE, SUBID}, + {rh(), rap(), nl(), qos(), share(), subid()} + , make_subopts(RH, RAP, NL, QOS, SHARE, SUBID)). + +message() -> + ?LET({QoS, Topic, Payload}, + {qos(), topic(), payload()}, + emqx_message:make(proper, QoS, Topic, Payload)). + +subid() -> integer(). + +rh() -> oneof([0, 1, 2]). + +rap() -> oneof([0, 1]). + +nl() -> oneof([0, 1]). + +qos() -> oneof([0, 1, 2]). + +share() -> binary(). + +clientid() -> binary(). + +topic() -> ?LET(No, choose(1, 10), begin + NoBin = integer_to_binary(No), + <<"topic/", NoBin/binary>> + end). + +payload() -> binary(). + +packetid() -> choose(1, 30). + +zone() -> + ?LET(Zone, [{max_subscriptions, max_subscription()}, + {upgrade_qos, upgrade_qos()}, + {retry_interval, retry_interval()}, + {max_awaiting_rel, max_awaiting_rel()}, + {await_rel_timeout, await_rel_timeout()}] + , maps:from_list(Zone)). + +max_subscription() -> frequency([{33, 0}, + {33, 1}, + {34, choose(0,10)}]). + +upgrade_qos() -> bool(). + +retry_interval() -> ?LET(Interval, choose(0, 20), Interval*1000). + +max_awaiting_rel() -> choose(0, 10). + +await_rel_timeout() -> ?LET(Interval, choose(0, 150), Interval*1000). + +max_inflight() -> choose(0, 10). + +expiry_interval() -> ?LET(EI, choose(1, 10), EI * 3600). + +option() -> + ?LET(Option, [{max_inflight, max_inflight()}, + {expiry_interval, expiry_interval()}] + , maps:from_list(Option)). + +cleanstart() -> bool(). + +session() -> + ?LET({CleanStart, Zone, Options}, + {cleanstart(), zone(), option()}, + begin + Session = emqx_session:init(CleanStart, #{zone => Zone}, Options), + emqx_session:set_pkt_id(Session, 16#ffff) + end). + +%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Internal functions %%% +%%%%%%%%%%%%%%%%%%%%%%%%%% + +make_subopts(RH, RAP, NL, QOS, SHARE, SubId) -> + #{rh => RH, + rap => RAP, + nl => NL, + qos => QOS, + share => SHARE, + subid => SubId}. + + +load(Modules) -> + [mock(Module) || Module <- Modules], + ok. + +unload(Modules) -> + lists:foreach(fun(Module) -> + ok = meck:unload(Module) + end, Modules), + ok. + +mock(Module) -> + ok = meck:new(Module, [passthrough, no_history]), + do_mock(Module, expect(Module)). + +do_mock(emqx_metrics, Expect) -> + Expect(inc, fun(_Anything) -> ok end); +do_mock(emqx_broker, Expect) -> + Expect(subscribe, fun(_, _, _) -> ok end), + Expect(set_subopts, fun(_, _) -> ok end), + Expect(unsubscribe, fun(_) -> ok end), + Expect(publish, fun(_) -> ok end); +do_mock(emqx_misc, Expect) -> + Expect(start_timer, fun(_, _) -> tref end); +do_mock(emqx_message, Expect) -> + Expect(set_header, fun(_Hdr, _Val, Msg) -> Msg end), + Expect(is_expired, fun(_Msg) -> (rand:uniform(16) > 8) end); +do_mock(emqx_hooks, Expect) -> + Expect(run, fun(_Hook, _Args) -> ok end); +do_mock(emqx_zone, Expect) -> + Expect(get_env, fun(Env, Key, Default) -> maps:get(Key, Env, Default) end); +do_mock(emqx_pd, Expect) -> + Expect(update_counter, fun(_stats, _num) -> ok end). + +expect(Module) -> + fun(OldFun, NewFun) -> + ok = meck:expect(Module, OldFun, NewFun) + end.