diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index b00bef4f4..f46ef0f0b 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -37,9 +37,7 @@ load(Topics) -> emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}). on_client_connected(#{client_id := ClientId, - username := Username, - conn_mod := ConnMod - }, ?RC_SUCCESS, _ConnAttrs, Topics) -> + username := Username}, ?RC_SUCCESS, _ConnAttrs, Topics) -> Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end, diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index c6aa43bcf..0aa3a2a23 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -103,7 +103,7 @@ check_sub(Zone, Topic, SubOpts) -> do_check_sub(Flags, Caps). do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) - when Limit =/= 0 andalso Levels > Limit -> + when Limit > 0, Levels > Limit -> {error, ?RC_TOPIC_FILTER_INVALID}; do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index c9bf3690b..49f840362 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -30,6 +30,9 @@ , caps/1 ]). +%% for tests +-export([set/3]). + -export([ init/2 , handle_in/2 , handle_req/2 @@ -94,9 +97,17 @@ info(proto_ver, #protocol{proto_ver = ProtoVer}) -> ProtoVer; info(keepalive, #protocol{keepalive = Keepalive}) -> Keepalive; +info(will_msg, #protocol{will_msg = WillMsg}) -> + WillMsg; info(topic_aliases, #protocol{topic_aliases = Aliases}) -> Aliases. +%% For tests +set(client, Client, PState) -> + PState#protocol{client = Client}; +set(session, Session, PState) -> + PState#protocol{session = Session}. + attrs(#protocol{client = Client, session = Session, proto_name = ProtoName, @@ -112,6 +123,7 @@ attrs(#protocol{client = Client, caps(#protocol{client = #{zone := Zone}}) -> emqx_mqtt_caps:get_caps(Zone). + -spec(init(emqx_types:conn(), proplists:proplist()) -> proto_state()). init(ConnInfo, Options) -> Zone = proplists:get_value(zone, Options), @@ -195,16 +207,16 @@ handle_in(?PUBREC_PACKET(PacketId, ReasonCode), PState = #protocol{session = Ses case emqx_session:pubrec(PacketId, ReasonCode, Session) of {ok, NSession} -> handle_out({pubrel, PacketId}, PState#protocol{session = NSession}); - {error, ReasonCode} -> - handle_out({pubrel, PacketId, ReasonCode}, PState) + {error, ReasonCode1} -> + handle_out({pubrel, PacketId, ReasonCode1}, PState) end; handle_in(?PUBREL_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> case emqx_session:pubrel(PacketId, ReasonCode, Session) of {ok, NSession} -> handle_out({pubcomp, PacketId}, PState#protocol{session = NSession}); - {error, ReasonCode} -> - handle_out({pubcomp, PacketId, ReasonCode}, PState) + {error, ReasonCode1} -> + handle_out({pubcomp, PacketId, ReasonCode1}, PState) end; handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 81b24087b..5dfcfe8d2 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -53,6 +53,7 @@ -export([init/3]). -export([ info/1 + , info/2 , attrs/1 , stats/1 ]). @@ -158,7 +159,7 @@ init(CleanStart, #{zone := Zone}, #{max_inflight := MaxInflight, retry_interval = get_env(Zone, retry_interval, 0), awaiting_rel = #{}, max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), - await_rel_timeout = get_env(Zone, await_rel_timeout), + await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000), expiry_interval = ExpiryInterval, created_at = os:timestamp() }. @@ -206,6 +207,39 @@ info(#session{clean_start = CleanStart, created_at => CreatedAt }. +info(clean_start, #session{clean_start = CleanStart}) -> + CleanStart; +info(subscriptions, #session{subscriptions = Subs}) -> + Subs; +info(max_subscriptions, #session{max_subscriptions = MaxSubs}) -> + MaxSubs; +info(upgrade_qos, #session{upgrade_qos = UpgradeQoS}) -> + UpgradeQoS; +info(inflight, #session{inflight = Inflight}) -> + emqx_inflight:size(Inflight); +info(max_inflight, #session{inflight = Inflight}) -> + emqx_inflight:max_size(Inflight); +info(retry_interval, #session{retry_interval = Interval}) -> + Interval; +info(mqueue_len, #session{mqueue = MQueue}) -> + emqx_mqueue:len(MQueue); +info(max_mqueue, #session{mqueue = MQueue}) -> + emqx_mqueue:max_len(MQueue); +info(mqueue_dropped, #session{mqueue = MQueue}) -> + emqx_mqueue:dropped(MQueue); +info(next_pkt_id, #session{next_pkt_id = PacketId}) -> + PacketId; +info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) -> + maps:size(AwaitingRel); +info(max_awaiting_rel, #session{max_awaiting_rel = MaxAwaitingRel}) -> + MaxAwaitingRel; +info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> + Timeout; +info(expiry_interval, #session{expiry_interval = Interval}) -> + Interval div 1000; +info(created_at, #session{created_at = CreatedAt}) -> + CreatedAt. + %%-------------------------------------------------------------------- %% Attrs of the session %%-------------------------------------------------------------------- @@ -343,7 +377,7 @@ puback(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) -> {value, {Msg, _Ts}} when is_record(Msg, message) -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), dequeue(Session#session{inflight = Inflight1}); - false -> + none -> ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ok = emqx_metrics:inc('packets.puback.missed'), {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -615,7 +649,8 @@ expire_awaiting_rel([], _Now, Session) -> {ok, Session#session{await_rel_timer = undefined}}; expire_awaiting_rel([{PacketId, Ts} | More], Now, - Session = #session{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> + Session = #session{awaiting_rel = AwaitingRel, + await_rel_timeout = Timeout}) -> case (timer:now_diff(Now, Ts) div 1000) of Age when Age >= Timeout -> ok = emqx_metrics:inc('messages.qos2.expired'), diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 63aeda50e..46d800d38 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -42,63 +42,155 @@ end_per_suite(_Config) -> %% Test cases for handle_in %%-------------------------------------------------------------------- -t_handle_in_connect(_) -> - 'TODO'. +t_handle_connect(_) -> + ConnPkt = #mqtt_packet_connect{ + proto_name = <<"MQTT">>, + proto_ver = ?MQTT_PROTO_V4, + is_bridge = false, + clean_start = true, + keepalive = 30, + properties = #{}, + client_id = <<"clientid">>, + username = <<"username">>, + password = <<"passwd">> + }, + with_proto( + fun(PState) -> + {ok, ?CONNACK_PACKET(?RC_SUCCESS), PState1} + = handle_in(?CONNECT_PACKET(ConnPkt), PState), + Client = emqx_protocol:info(client, PState1), + ?assertEqual(<<"clientid">>, maps:get(client_id, Client)), + ?assertEqual(<<"username">>, maps:get(username, Client)) + end). -t_handle_in_publish(_) -> - 'TODO'. +t_handle_publish_qos0(_) -> + with_proto( + fun(PState) -> + Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>), + {ok, PState} = handle_in(Publish, PState) + end). -t_handle_in_puback(_) -> - 'TODO'. +t_handle_publish_qos1(_) -> + with_proto( + fun(PState) -> + Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>), + {ok, ?PUBACK_PACKET(1, RC), _} = handle_in(Publish, PState), + ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)) + end). -t_handle_in_pubrec(_) -> - 'TODO'. +t_handle_publish_qos2(_) -> + with_proto( + fun(PState) -> + Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), + {ok, ?PUBREC_PACKET(1, RC), PState1} = handle_in(Publish1, PState), + Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>), + {ok, ?PUBREC_PACKET(2, RC), PState2} = handle_in(Publish2, PState1), + ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)), + Session = emqx_protocol:info(session, PState2), + ?assertEqual(2, emqx_session:info(awaiting_rel, Session)) + end). -t_handle_in_pubrel(_) -> - 'TODO'. +t_handle_puback(_) -> + with_proto( + fun(PState) -> + {ok, PState} = handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), PState) + end). -t_handle_in_pubcomp(_) -> - 'TODO'. +t_handle_pubrec(_) -> + with_proto( + fun(PState) -> + {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), PState} + = handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), PState) + end). -t_handle_in_subscribe(_) -> - 'TODO'. +t_handle_pubrel(_) -> + with_proto( + fun(PState) -> + {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), PState} + = handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), PState) + end). -t_handle_in_unsubscribe(_) -> - 'TODO'. +t_handle_pubcomp(_) -> + with_proto( + fun(PState) -> + {ok, PState} = handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), PState) + end). -t_handle_in_pingreq(_) -> - with_proto(fun(PState) -> - {ok, ?PACKET(?PINGRESP), PState} = handle_in(?PACKET(?PINGREQ), PState) - end). +t_handle_subscribe(_) -> + with_proto( + fun(PState) -> + TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], + {ok, ?SUBACK_PACKET(10, [?QOS_0]), PState1} + = handle_in(?SUBSCRIBE_PACKET(10, #{}, TopicFilters), PState), + Session = emqx_protocol:info(session, PState1), + ?assertEqual(maps:from_list(TopicFilters), + emqx_session:info(subscriptions, Session)) -t_handle_in_disconnect(_) -> - 'TODO'. + end). -t_handle_in_auth(_) -> - 'TODO'. +t_handle_unsubscribe(_) -> + with_proto( + fun(PState) -> + {ok, ?UNSUBACK_PACKET(11), PState} + = handle_in(?UNSUBSCRIBE_PACKET(11, #{}, [<<"+">>]), PState) + end). + +t_handle_pingreq(_) -> + with_proto( + fun(PState) -> + {ok, ?PACKET(?PINGRESP), PState} = handle_in(?PACKET(?PINGREQ), PState) + end). + +t_handle_disconnect(_) -> + with_proto( + fun(PState) -> + {stop, normal, PState1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), PState), + ?assertEqual(undefined, emqx_protocol:info(will_msg, PState1)) + end). + +t_handle_auth(_) -> + with_proto( + fun(PState) -> + {ok, PState} = handle_in(?AUTH_PACKET(), PState) + end). %%-------------------------------------------------------------------- %% Test cases for handle_deliver %%-------------------------------------------------------------------- t_handle_deliver(_) -> - 'TODO'. + with_proto( + fun(PState) -> + 'TODO' + end). %%-------------------------------------------------------------------- %% Test cases for handle_out %%-------------------------------------------------------------------- -t_handle_out_conack(_) -> - 'TODO'. +t_handle_conack(_) -> + with_proto( + fun(PState) -> + 'TODO' + end). t_handle_out_publish(_) -> - 'TODO'. + with_proto( + fun(PState) -> + 'TODO' + end). t_handle_out_puback(_) -> - 'TODO'. + with_proto( + fun(PState) -> + 'TODO' + end). t_handle_out_pubrec(_) -> - 'TODO'. + with_proto( + fun(PState) -> + 'TODO' + end). t_handle_out_pubrel(_) -> 'TODO'. @@ -123,22 +215,36 @@ t_handle_out_auth(_) -> %%-------------------------------------------------------------------- t_handle_timeout(_) -> - 'TODO'. + with_proto( + fun(PState) -> + 'TODO' + end). %%-------------------------------------------------------------------- %% Test cases for terminate %%-------------------------------------------------------------------- t_terminate(_) -> - 'TODO'. + with_proto( + fun(PState) -> + 'TODO' + end). %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- with_proto(Fun) -> - Fun(emqx_protocol:init(#{peername => {{127,0,0,1}, 3456}, - sockname => {{127,0,0,1}, 1883}, - conn_mod => emqx_channel}, - #{zone => ?MODULE})). + ConnInfo = #{peername => {{127,0,0,1}, 3456}, + sockname => {{127,0,0,1}, 1883}, + client_id => <<"clientid">>, + username => <<"username">> + }, + Options = [{zone, testing}], + PState = emqx_protocol:init(ConnInfo, Options), + Session = emqx_session:init(false, #{zone => testing}, + #{max_inflight => 100, + expiry_interval => 0 + }), + Fun(emqx_protocol:set(session, Session, PState)).