diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 94916637d..99a595929 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -615,13 +615,13 @@ handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps), case maybe_resume_session(Channel2) of ignore -> - {ok, [{enter, connected}, {outgoing, AckPacket}], Channel2}; + {ok, [{connack, AckPacket}], Channel2}; {ok, Publishes, NSession} -> Channel3 = Channel2#channel{session = NSession, resuming = false, pendings = []}, {ok, {outgoing, Packets}, _} = handle_out({publish, Publishes}, Channel3), - {ok, [{enter, connected}, {outgoing, [AckPacket|Packets]}], Channel3} + {ok, [{connack, AckPacket}, {outgoing, Packets}], Channel3} end; handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo, @@ -880,7 +880,7 @@ interval(alive_timer, #channel{keepalive = KeepAlive}) -> interval(retry_timer, #channel{session = Session}) -> emqx_session:info(retry_interval, Session); interval(await_timer, #channel{session = Session}) -> - emqx_session:info(await_rel_timeout, Session); + emqx_session:info(awaiting_rel_timeout, Session); interval(expire_timer, #channel{conninfo = ConnInfo}) -> timer:seconds(maps:get(expiry_interval, ConnInfo)); interval(will_timer, #channel{will_msg = WillMsg}) -> @@ -1068,13 +1068,13 @@ check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, end. %% Check Pub Alias +%% TODO: Fixme later check_pub_alias(#mqtt_packet{ variable = #mqtt_packet_publish{ properties = #{'Topic-Alias' := AliasId} } }, #channel{alias_maximum = Limits}) -> - %% TODO: Move to Protocol case (Limits == undefined) orelse (Max = maps:get(inbound, Limits, 0)) == 0 orelse (AliasId > Max) of @@ -1219,10 +1219,14 @@ reply(Reply, Channel) -> {reply, Reply, Channel}. -compile({inline, [shutdown/2]}). +shutdown(success, Channel) -> + shutdown(normal, Channel); shutdown(Reason, Channel) -> {shutdown, Reason, Channel}. -compile({inline, [shutdown/3]}). +shutdown(success, Reply, Channel) -> + shutdown(normal, Reply, Channel); shutdown(Reason, Reply, Channel) -> {shutdown, Reason, Reply, Channel}. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 1da494dfa..b557d42b7 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -516,9 +516,20 @@ send(IoData, State = #state{transport = Transport, %%-------------------------------------------------------------------- %% Handle Info -handle_info({enter, _}, State = #state{active_n = ActiveN, - sockstate = SockSt, - channel = Channel}) -> +handle_info({connack, ConnAck}, State = #state{active_n = ActiveN, + sockstate = SockSt, + channel = Channel}) -> + NState = handle_outgoing(ConnAck, State), + ChanAttrs = emqx_channel:attrs(Channel), + SockAttrs = #{active_n => ActiveN, + sockstate => SockSt + }, + Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), + handle_info({register, Attrs, stats(State)}, NState); + +handle_info({enter, disconnected}, State = #state{active_n = ActiveN, + sockstate = SockSt, + channel = Channel}) -> ChanAttrs = emqx_channel:attrs(Channel), SockAttrs = #{active_n => ActiveN, sockstate => SockSt diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 67ac6e3ce..6879dcc06 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -116,7 +116,7 @@ %% Max Packets Awaiting PUBREL max_awaiting_rel :: non_neg_integer(), %% Awaiting PUBREL Timeout - await_rel_timeout :: timeout(), + awaiting_rel_timeout :: timeout(), %% Deliver Stats deliver_stats :: emqx_types:stats(), %% Created at @@ -135,7 +135,7 @@ mqueue_max, retry_interval, awaiting_rel_max, - await_rel_timeout, + awaiting_rel_timeout, created_at ]). @@ -151,7 +151,7 @@ next_pkt_id, awaiting_rel, awaiting_rel_max, - await_rel_timeout, + awaiting_rel_timeout, created_at ]). @@ -183,7 +183,7 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> retry_interval = get_env(Zone, retry_interval, 0), awaiting_rel = #{}, max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), - await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000), + awaiting_rel_timeout = get_env(Zone, awaiting_rel_timeout, 3600*1000), created_at = erlang:system_time(second) }. @@ -236,13 +236,17 @@ info(mqueue_dropped, #session{mqueue = MQueue}) -> info(next_pkt_id, #session{next_pkt_id = PacketId}) -> PacketId; info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) -> - maps:values(AwaitingRel); -info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) -> AwaitingRel; +info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) -> + maps:size(AwaitingRel); info(awaiting_rel_max, #session{max_awaiting_rel = MaxAwaitingRel}) -> MaxAwaitingRel; -info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> +info(awaiting_rel_timeout, #session{awaiting_rel_timeout = Timeout}) -> Timeout; +info(enqueue_cnt, #session{deliver_stats = undefined}) -> + 0; +info(enqueue_cnt, #session{deliver_stats = Stats}) -> + maps:get(enqueue_cnt, Stats, 0); info(deliver_stats, #session{deliver_stats = Stats}) -> Stats; info(created_at, #session{created_at = CreatedAt}) -> @@ -338,8 +342,7 @@ unsubscribe(ClientInfo, TopicFilter, Session = #session{subscriptions = Subs}) - %%-------------------------------------------------------------------- -spec(publish(emqx_types:packet_id(), emqx_types:message(), session()) - -> {ok, emqx_types:publish_result()} | - {ok, emqx_types:publish_result(), session()} | + -> {ok, emqx_types:publish_result(), session()} | {error, emqx_types:reason_code()}). publish(PacketId, Msg = #message{qos = ?QOS_2}, Session) -> case is_awaiting_full(Session) of @@ -350,8 +353,8 @@ publish(PacketId, Msg = #message{qos = ?QOS_2}, Session) -> end; %% Publish QoS0/1 directly -publish(_PacketId, Msg, _Session) -> - {ok, emqx_broker:publish(Msg)}. +publish(_PacketId, Msg, Session) -> + {ok, emqx_broker:publish(Msg), Session}. is_awaiting_full(#session{max_awaiting_rel = 0}) -> false; @@ -621,11 +624,11 @@ expire_awaiting_rel([], _Now, Session) -> expire_awaiting_rel([{PacketId, Ts} | More], Now, Session = #session{awaiting_rel = AwaitingRel, - await_rel_timeout = Timeout}) -> + awaiting_rel_timeout = Timeout}) -> case (timer:now_diff(Now, Ts) div 1000) of Age when Age >= Timeout -> ok = emqx_metrics:inc('messages.qos2.expired'), - ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId]), + ?LOG(warning, "Dropped qos2 packet ~s for awaiting_rel_timeout", [PacketId]), Session1 = Session#session{awaiting_rel = maps:remove(PacketId, AwaitingRel)}, expire_awaiting_rel(More, Now, Session1); Age -> @@ -648,6 +651,8 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) -> inc_deliver_stats(Key, Session) -> inc_deliver_stats(Key, 1, Session). +inc_deliver_stats(Key, I, Session = #session{deliver_stats = undefined}) -> + Session#session{deliver_stats = #{Key => I}}; inc_deliver_stats(Key, I, Session = #session{deliver_stats = Stats}) -> NStats = maps:update_with(Key, fun(V) -> V+I end, I, Stats), Session#session{deliver_stats = NStats}. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 8b78961f3..99226ec6e 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -93,7 +93,9 @@ info(sockname, #state{sockname = Sockname}) -> info(sockstate, #state{sockstate = SockSt}) -> SockSt; info(channel, #state{channel = Channel}) -> - emqx_channel:info(Channel). + emqx_channel:info(Channel); +info(stop_reason, #state{stop_reason = Reason}) -> + Reason. -spec(stats(pid()|state()) -> emqx_types:stats()). stats(WsPid) when is_pid(WsPid) -> @@ -290,7 +292,14 @@ handle_call(From, Req, State = #state{channel = Channel}) -> %%-------------------------------------------------------------------- %% Handle Info -handle_info({enter, _}, State = #state{channel = Channel}) -> +handle_info({connack, ConnAck}, State = #state{channel = Channel}) -> + ChanAttrs = emqx_channel:attrs(Channel), + SockAttrs = maps:from_list(info(?INFO_KEYS, State)), + Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), + ok = emqx_channel:handle_info({register, Attrs, stats(State)}, Channel), + reply(enqueue(ConnAck, State)); + +handle_info({enter, disconnected}, State = #state{channel = Channel}) -> ChanAttrs = emqx_channel:attrs(Channel), SockAttrs = maps:from_list(info(?INFO_KEYS, State)), Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 7d8c62433..f8a910a47 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -51,21 +51,34 @@ end_per_suite(_Config) -> ok. init_per_testcase(_TestCase, Config) -> + %% CM Meck + ok = meck:new(emqx_cm, [passthrough, no_history]), + %% Access Control Meck + ok = meck:new(emqx_access_control, [passthrough, no_history]), + ok = meck:expect(emqx_access_control, authenticate, + fun(_) -> {ok, #{auth_result => success}} end), + ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end), + %% Broker Meck ok = meck:new(emqx_broker, [passthrough, no_history]), + %% Hooks Meck ok = meck:new(emqx_hooks, [passthrough, no_history]), ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end), ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end), + %% Session Meck ok = meck:new(emqx_session, [passthrough, no_history]), + %% Metrics ok = meck:new(emqx_metrics, [passthrough, no_history]), ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), Config. end_per_testcase(_TestCase, Config) -> + ok = meck:unload(emqx_access_control), ok = meck:unload(emqx_metrics), ok = meck:unload(emqx_session), ok = meck:unload(emqx_broker), ok = meck:unload(emqx_hooks), + ok = meck:unload(emqx_cm), Config. %%-------------------------------------------------------------------- @@ -106,9 +119,12 @@ t_chan_init(_) -> %%-------------------------------------------------------------------- t_handle_in_connect_packet_sucess(_) -> - ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}), - {ok, {connack, ConnAck}, Channel} - = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), channel()), + ok = meck:expect(emqx_cm, open_session, + fun(true, _ClientInfo, _ConnInfo) -> + {ok, #{session => session(), present => false}} + end), + {ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS, 0)}], Channel} + = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), channel(#{conn_state => idle})), ClientInfo = emqx_channel:info(clientinfo, Channel), ?assertMatch(#{clientid := <<"clientid">>, username := <<"username">> @@ -117,8 +133,8 @@ t_handle_in_connect_packet_sucess(_) -> t_handle_in_unexpected_connect_packet(_) -> Channel = emqx_channel:set_field(conn_state, connected, channel()), - Result = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel), - ?assertEqual({shutdown, protocol_error, Channel}, Result). + {shutdown, protocol_error, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), Channel} + = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel). t_handle_in_qos0_publish(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> ok end), @@ -133,15 +149,16 @@ t_handle_in_qos1_publish(_) -> Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>), {ok, ?PUBACK_PACKET(1, RC), NChannel} = emqx_channel:handle_in(Publish, Channel), ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)), - ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)). + ?assertEqual(#{publish_in => 1, puback_out => 1}, emqx_channel:info(pub_stats, NChannel)). t_handle_in_qos2_publish(_) -> ok = meck:expect(emqx_session, publish, fun(_, _Msg, Session) -> {ok, [], Session} end), + ok = meck:expect(emqx_session, info, fun(awaiting_rel_timeout, _Session) -> 300000 end), Channel = channel(#{conn_state => connected}), Publish = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), {ok, ?PUBREC_PACKET(1, RC), NChannel} = emqx_channel:handle_in(Publish, Channel), ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)), - ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)). + ?assertEqual(#{publish_in => 1, pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)). t_handle_in_puback_ok(_) -> Msg = emqx_message:make(<<"t">>, <<"payload">>), @@ -156,18 +173,16 @@ t_handle_in_puback_id_in_use(_) -> fun(_, _Session) -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} end), - Channel = channel(#{conn_state => connected}), - PubAck = ?PUBACK_PACKET(1, ?RC_SUCCESS), - {ok, Channel} = emqx_channel:handle_in(PubAck, Channel). + {ok, Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()), + ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)). t_handle_in_puback_id_not_found(_) -> ok = meck:expect(emqx_session, puback, fun(_, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), - Channel = channel(#{conn_state => connected}), - PubAck = ?PUBACK_PACKET(1, ?RC_SUCCESS), - {ok, Channel} = emqx_channel:handle_in(PubAck, Channel). + {ok, Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()), + ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)). t_handle_in_pubrec_ok(_) -> Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>), @@ -183,18 +198,20 @@ t_handle_in_pubrec_id_in_use(_) -> fun(_, Session) -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} end), - Channel = channel(#{conn_state => connected}), {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} - = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel). + = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()), + ?assertEqual(#{pubrec_in => 1, pubrel_out => 1}, + emqx_channel:info(pub_stats, Channel)). t_handle_in_pubrec_id_not_found(_) -> ok = meck:expect(emqx_session, pubrec, fun(_, Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), - Channel = channel(#{conn_state => connected}), {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel} - = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel). + = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()), + ?assertEqual(#{pubrec_in => 1, pubrel_out => 1}, + emqx_channel:info(pub_stats, Channel)). t_handle_in_pubrel_ok(_) -> ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end), @@ -209,15 +226,13 @@ t_handle_in_pubrel_not_found_error(_) -> fun(_PacketId, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), - Channel = channel(#{conn_state => connected}), - {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel} - = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel). + {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} + = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()). t_handle_in_pubcomp_ok(_) -> ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end), - Channel = channel(#{conn_state => connected}), - {ok, Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel), - ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)). + {ok, Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()), + ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)). t_handle_in_pubcomp_not_found_error(_) -> ok = meck:expect(emqx_session, pubcomp, @@ -270,35 +285,38 @@ t_handle_in_frame_error(_) -> {shutdown, frame_too_large, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), _} = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan), ConnectedChan = channel(#{conn_state => connected}), - {shutdown, frame_too_large, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _} + {shutdown, malformed_Packet, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _} = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan), DisconnectedChan = channel(#{conn_state => disconnected}), {ok, DisconnectedChan} = emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan). +%% TODO: t_handle_in_expected_packet(_) -> - {ok, _Chan} = emqx_channel:handle_in(packet, channel()). + {shutdown, protocol_error, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _Chan} + = emqx_channel:handle_in(packet, channel()). t_process_connect(_) -> ok = meck:expect(emqx_cm, open_session, fun(true, _ClientInfo, _ConnInfo) -> {ok, #{session => session(), present => false}} end), - ConnPkt = connpkt(), - {ok, ?CONNACK_PACKET(?RC_SUCCESS), ConnPkt, _} - = emqx_channel:process_connect(ConnPkt, channel()). + {ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Channel} + = emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})). -t_handle_publish(_) -> - Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), - {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _Channel} - = emqx_channel:handle_publish(Publish, channel()). +t_handle_publish_qos0(_) -> + ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), + Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>), + {ok, _Channel} = emqx_channel:handle_publish(Publish, channel()). t_process_publish_qos1(_) -> + ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>), - {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _Channel} + {ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel} = emqx_channel:process_publish(1, Msg, channel()). t_process_subscribe(_) -> + ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end), TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], {[?RC_SUCCESS], _Channel} = emqx_channel:process_subscribe(TopicFilters, channel()). @@ -312,11 +330,17 @@ t_process_unsubscribe(_) -> %%-------------------------------------------------------------------- t_handle_out_delivers(_) -> - ok = emqx_meck:expect(emqx_session, deliver, - fun(Delivers, Session) -> - Msgs = [Msg || {deliver, _, Msg} <- Delivers], - [{publish, PacketId, Msg} || {PacketId, Msg} <- lists:zip(lists:seq(1, length(Msgs)), Msgs)] - end), + WithPacketId = fun(Msgs) -> + lists:zip(lists:seq(1, length(Msgs)), Msgs) + end, + ok = meck:expect(emqx_session, deliver, + fun(Delivers, Session) -> + Msgs = [Msg || {deliver, _, Msg} <- Delivers], + Publishes = [{publish, PacketId, Msg} + || {PacketId, Msg} <- WithPacketId(Msgs)], + {ok, Publishes, Session} + end), + ok = meck:expect(emqx_session, info, fun(retry_interval, _Session) -> 20000 end), Msg0 = emqx_message:make(test, ?QOS_1, <<"t1">>, <<"qos1">>), Msg1 = emqx_message:make(test, ?QOS_2, <<"t2">>, <<"qos2">>), Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}], @@ -334,7 +358,7 @@ t_handle_out_publishes(_) -> t_handle_out_publish(_) -> Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>), - {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, <<"payload">>), _Channel} + {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan} = emqx_channel:handle_out({publish, 1, Msg}, channel()). t_handle_out_publish_nl(_) -> @@ -345,13 +369,12 @@ t_handle_out_publish_nl(_) -> {ok, Channel} = emqx_channel:handle_out(Publish, Channel). t_handle_out_connack_sucess(_) -> - Channel = channel(#{conn_state => connected}), - {ok, {connack, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}, _Chan} - = emqx_channel:handle_out({connack, ?RC_SUCCESS, 0, connpkt()}, Channel). + {ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}], _Chan} + = emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()). t_handle_out_connack_failure(_) -> {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} - = emqx_channel:handle_out({connack, ?RC_NOT_AUTHORIZED, connpkt()}, channel()). + = emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()). t_handle_out_puback(_) -> Channel = channel(#{conn_state => connected}), @@ -362,33 +385,33 @@ t_handle_out_puback(_) -> t_handle_out_pubrec(_) -> Channel = channel(#{conn_state => connected}), {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), NChannel} - = emqx_channel:handle_out({pubrec, 1, ?RC_SUCCESS}, Channel), + = emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel), ?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)). t_handle_out_pubrel(_) -> Channel = channel(#{conn_state => connected}), {ok, ?PUBREL_PACKET(1), Channel1} - = emqx_channel:handle_out({pubrel, 1, ?RC_SUCCESS}, Channel), + = emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel), {ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), Channel2} - = emqx_channel:handle_out({pubrel, 2, ?RC_SUCCESS}, Channel1), + = emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1), ?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2)). t_handle_out_pubcomp(_) -> {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel} - = emqx_channel:handle_out({pubcomp, 2, ?RC_SUCCESS}, channel()), + = emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()), ?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel)). t_handle_out_suback(_) -> {ok, ?SUBACK_PACKET(1, [?QOS_2]), _Channel} - = emqx_channel:handle_out({suback, 1, [?QOS_2]}, channel()). + = emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()). t_handle_out_unsuback(_) -> {ok, ?UNSUBACK_PACKET(1, [?RC_SUCCESS]), _Channel} - = emqx_channel:handle_out({unsuback, 1, [?RC_SUCCESS]}, channel()). + = emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()). t_handle_out_disconnect(_) -> {shutdown, normal, ?DISCONNECT_PACKET(?RC_SUCCESS), _Chan} - = emqx_channel:handle_out({disconnect, ?RC_SUCCESS}, channel()). + = emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()). t_handle_out_unexpected(_) -> {ok, _Channel} = emqx_channel:handle_out(unexpected, <<"data">>, channel()). @@ -430,26 +453,40 @@ t_handle_info_unsubscribe(_) -> {ok, _Chan} = emqx_channel:handle_info({unsubscribe, topic_filters()}, channel()). t_handle_info_sock_closed(_) -> - {ok, _Chan} = emqx_channel:handle_out({sock_closed, reason}, channel(#{conn_state => disconnected})). + {ok, _Chan} = emqx_channel:handle_out({sock_closed, reason}, + channel(#{conn_state => disconnected})). %%-------------------------------------------------------------------- %% Test cases for handle_timeout %%-------------------------------------------------------------------- t_handle_timeout_emit_stats(_) -> - {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {emit_stats, []}, channel()). + ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> ok end), + TRef = make_ref(), + Channel = emqx_channel:set_field(timers, #{stats_timer => TRef}, channel()), + {ok, _Chan} = emqx_channel:handle_timeout(TRef, {emit_stats, []}, Channel). t_handle_timeout_keepalive(_) -> + TRef = make_ref(), + Channel = emqx_channel:set_field(timers, #{alive_timer => TRef}, channel()), {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {keepalive, 10}, channel()). t_handle_timeout_retry_delivery(_) -> - {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), retry_delivery, channel()). + ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end), + TRef = make_ref(), + Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()), + {ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, channel()). t_handle_timeout_expire_awaiting_rel(_) -> - {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), expire_awaiting_rel, channel()). + ok = meck:expect(emqx_session, expire, fun(_, Session) -> {ok, Session} end), + TRef = make_ref(), + Channel = emqx_channel:set_field(timers, #{await_timer => TRef}, channel()), + {ok, _Chan} = emqx_channel:handle_timeout(TRef, expire_awaiting_rel, Channel). t_handle_timeout_expire_session(_) -> - {shutdown, expired, _Chan} = emqx_channel:handle_timeout(make_ref(), expire_awaiting_rel, channel()). + TRef = make_ref(), + Channel = emqx_channel:set_field(timers, #{expire_timer => TRef}, channel()), + {shutdown, expired, _Chan} = emqx_channel:handle_timeout(TRef, expire_session, Channel). t_handle_timeout_will_message(_) -> {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), will_message, channel()). @@ -471,7 +508,6 @@ t_check_flapping(_) -> ok = emqx_channel:check_flapping(connpkt(), channel()). t_auth_connect(_) -> - ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {ok, #{}} end), {ok, _Chan} = emqx_channel:auth_connect(connpkt(), channel()). t_process_alias(_) -> @@ -481,15 +517,22 @@ t_process_alias(_) -> = emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel). t_check_pub_acl(_) -> + ok = meck:new(emqx_zone, [passthrough, no_history]), + ok = meck:expect(emqx_zone, enable_acl, fun(_) -> true end), Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>), - ok = emqx_channel:check_pub_acl(Publish, channel()). + ok = emqx_channel:check_pub_acl(Publish, channel()), + ok = meck:unload(emqx_zone). t_check_pub_alias(_) -> Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}}, - ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, channel()). + Channel = emqx_channel:set_field(alias_maximum, #{inbound => 10}, channel()), + ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel). t_check_subscribe(_) -> - ok = emqx_channel:check_subscribe(<<"t">>, ?DEFAULT_SUBOPTS, channel()). + ok = meck:new(emqx_zone, [passthrough, no_history]), + ok = meck:expect(emqx_zone, enable_acl, fun(_) -> true end), + ok = emqx_channel:check_subscribe(<<"t">>, ?DEFAULT_SUBOPTS, channel()), + ok = meck:unload(emqx_zone). t_enrich_caps(_) -> ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]), diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 1fb41576f..621fe4aa5 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -48,7 +48,7 @@ end_per_testcase(_TestCase, Config) -> %%-------------------------------------------------------------------- t_session_init(_) -> - Session = emqx_session:init(#{zone => external}, #{receive_maximum => 64}), + Session = emqx_session:init(#{zone => zone}, #{receive_maximum => 64}), ?assertEqual(#{}, emqx_session:info(subscriptions, Session)), ?assertEqual(0, emqx_session:info(subscriptions_cnt, Session)), ?assertEqual(0, emqx_session:info(subscriptions_max, Session)), @@ -67,30 +67,28 @@ t_session_init(_) -> %%-------------------------------------------------------------------- t_session_info(_) -> + Info = emqx_session:info(session()), ?assertMatch(#{subscriptions := #{}, subscriptions_max := 0, upgrade_qos := false, - inflight := 0, - inflight_max := 64, + inflight_max := 0, retry_interval := 0, mqueue_len := 0, mqueue_max := 1000, mqueue_dropped := 0, next_pkt_id := 1, - awaiting_rel := 0, - awaiting_rel_max := 0, - await_rel_timeout := 3600000 - }, emqx_session:info(session())). + awaiting_rel := #{}, + awaiting_rel_max := 100, + awaiting_rel_timeout := 3600000 + }, Info). t_session_attrs(_) -> Attrs = emqx_session:attrs(session()), - io:format("~p~n", [Attrs]), - error('TODO'). + io:format("~p~n", [Attrs]). t_session_stats(_) -> Stats = emqx_session:stats(session()), - io:format("~p~n", [Stats]), - error('TODO'). + io:format("~p~n", [Stats]). %%-------------------------------------------------------------------- %% Test cases for pub/sub @@ -125,7 +123,7 @@ t_publish_qos2(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<"payload">>), {ok, [], Session} = emqx_session:publish(1, Msg, session()), - ?assertEqual(awaiting_rel_cnt, emqx_session:info(awaiting_rel_cnt, Session)). + ?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)). t_publish_qos1(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), @@ -151,7 +149,7 @@ t_puback(_) -> Inflight = emqx_inflight:insert(1, {Msg, os:timestamp()}, emqx_inflight:new()), Session = set_field(inflight, Inflight, session()), {ok, Msg, NSession} = emqx_session:puback(1, Session), - ?assertEqual([], emqx_session:info(inflight, NSession)). + ?assertEqual(0, emqx_session:info(inflight_cnt, NSession)). t_puback_error_packet_id_in_use(_) -> Inflight = emqx_inflight:insert(1, {pubrel, os:timestamp()}, emqx_inflight:new()), @@ -166,14 +164,14 @@ t_pubrec(_) -> Inflight = emqx_inflight:insert(2, {Msg, os:timestamp()}, emqx_inflight:new()), Session = set_field(inflight, Inflight, session()), {ok, Msg, NSession} = emqx_session:pubrec(2, Session), - ?assertMatch([{pubrel, _}], emqx_session:info(inflight, NSession)). + ?assertMatch([{pubrel, _}], emqx_inflight:values(emqx_session:info(inflight, NSession))). -t_pubrec_error_packet_id_in_use(_) -> +t_pubrec_packet_id_in_use_error(_) -> Inflight = emqx_inflight:insert(1, {pubrel, ts()}, emqx_inflight:new()), Session = set_field(inflight, Inflight, session()), - {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, session()). + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, Session). -t_pubrec_error_packet_id_not_found(_) -> +t_pubrec_packet_id_not_found_error(_) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(1, session()). t_pubrel(_) -> @@ -188,7 +186,7 @@ t_pubcomp(_) -> Inflight = emqx_inflight:insert(2, {pubrel, os:timestamp()}, emqx_inflight:new()), Session = emqx_session:set_field(inflight, Inflight, session()), {ok, NSession} = emqx_session:pubcomp(2, Session), - ?assertEqual([], emqx_session:info(inflight, NSession)). + ?assertEqual(0, emqx_session:info(inflight_cnt, NSession)). t_pubcomp_id_not_found(_) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(2, session()). @@ -200,36 +198,39 @@ t_pubcomp_id_not_found(_) -> t_dequeue(_) -> {ok, Session} = emqx_session:dequeue(session()). -t_bach_n(_) -> - error('TODO'). - -t_dequeue_with_msgs(_) -> - error('TODO'). - t_deliver(_) -> - error('TODO'). + Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], + {ok, Publishes, _Session} = emqx_session:deliver(Delivers, session()), + ?assertEqual(2, length(Publishes)). t_enqueue(_) -> - error('TODO'). + Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], + Session = emqx_session:enqueue(Delivers, session()), + ?assertEqual(2, emqx_session:info(mqueue_len, Session)). t_retry(_) -> - error('TODO'). + {ok, _Session} = emqx_session:retry(session()). %%-------------------------------------------------------------------- %% Test cases for takeover/resume %%-------------------------------------------------------------------- t_takeover(_) -> - error('TODO'). + ok = meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end), + Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}), + ok = emqx_session:takeover(Session). t_resume(_) -> - error('TODO'). + ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), + Subs = #{<<"t">> => ?DEFAULT_SUBOPTS}, + Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}), + ok = emqx_session:resume(<<"clientid">>, Session). t_redeliver(_) -> - error('TODO'). + {ok, [], _Session} = emqx_session:redeliver(session()). t_expire(_) -> - error('TODO'). + {ok, _Session} = emqx_session:expire(awaiting_rel, session()). %%-------------------------------------------------------------------- %% Helper functions @@ -254,5 +255,8 @@ subopts() -> subopts(#{}). subopts(Init) -> maps:merge(?DEFAULT_SUBOPTS, Init). +delivery(QoS, Topic) -> + {deliver, Topic, emqx_message:make(test, QoS, Topic, <<"payload">>)}. + ts() -> erlang:system_time(second). diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index fa036335f..5806f6f73 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -120,8 +120,8 @@ t_websocket_handle_ping_pong(_) -> t_websocket_handle_bad_frame(_) -> with_ws_conn(fun(WsConn) -> - {stop, {shutdown, unexpected_ws_frame}, WsConn} - = websocket_handle({badframe, <<>>}, WsConn) + {stop, WsConn1} = websocket_handle({badframe, <<>>}, WsConn), + ?assertEqual({shutdown, unexpected_ws_frame}, stop_reason(WsConn1)) end). t_websocket_info_call(_) -> @@ -132,11 +132,11 @@ t_websocket_info_call(_) -> end). t_websocket_info_cast(_) -> - with_ws_conn(fun(WsConn) -> - websocket_info({cast, msg}, WsConn) - end). + ok = meck:expect(emqx_channel, handle_info, fun(_Msg, Channel) -> {ok, Channel} end), + with_ws_conn(fun(WsConn) -> websocket_info({cast, msg}, WsConn) end). t_websocket_info_incoming(_) -> + ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end), with_ws_conn(fun(WsConn) -> Connect = ?CONNECT_PACKET( #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, @@ -146,14 +146,18 @@ t_websocket_info_incoming(_) -> keepalive = 60}), {ok, WsConn1} = websocket_info({incoming, Connect}, WsConn), Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), - {ok, WsConn2} = websocket_info({incoming, Publish}, WsConn1) + {ok, _WsConn2} = websocket_info({incoming, Publish}, WsConn1) end). t_websocket_info_deliver(_) -> with_ws_conn(fun(WsConn) -> - Msg = emqx_message:make(<<"topic">>, <<"payload">>), - Deliver = {deliver, <<"#">>, Msg}, - {ok, WsConn1} = websocket_info(Deliver, WsConn) + ok = meck:expect(emqx_channel, handle_out, + fun(Delivers, Channel) -> + Packets = [emqx_message:to_packet(1, Msg) || {deliver, _, Msg} <- Delivers], + {ok, {outgoing, Packets}, Channel} + end), + Deliver = {deliver, <<"#">>, emqx_message:make(<<"topic">>, <<"payload">>)}, + {reply, {binary, _Data}, _WsConn1} = websocket_info(Deliver, WsConn) end). t_websocket_info_timeout(_) -> @@ -165,23 +169,31 @@ t_websocket_info_timeout(_) -> t_websocket_info_close(_) -> with_ws_conn(fun(WsConn) -> - {stop, {shutdown, sock_error}, WsConn} = websocket_info({close, sock_error}, WsConn) + {stop, WsConn1} = websocket_info({close, sock_error}, WsConn), + ?assertEqual({shutdown, sock_error}, stop_reason(WsConn1)) end). t_websocket_info_shutdown(_) -> with_ws_conn(fun(WsConn) -> - {stop, {shutdown, reason}, WsConn} = websocket_info({shutdown, reason}, WsConn) + {stop, WsConn1} = websocket_info({shutdown, reason}, WsConn), + ?assertEqual({shutdown, reason}, stop_reason(WsConn1)) end). + t_websocket_info_stop(_) -> with_ws_conn(fun(WsConn) -> - {stop, normal, WsConn} = websocket_info({stop, normal}, WsConn) + {stop, WsConn1} = websocket_info({stop, normal}, WsConn), + ?assertEqual(normal, stop_reason(WsConn1)) end). t_websocket_close(_) -> + ok = meck:expect(emqx_channel, handle_info, + fun({sock_closed, badframe}, Channel) -> + {shutdown, sock_closed, Channel} + end), with_ws_conn(fun(WsConn) -> - {stop, sock_closed, WsConn} - = emqx_ws_connection:websocket_close(badframe, WsConn) + {stop, WsConn1} = emqx_ws_connection:websocket_close(badframe, WsConn), + ?assertEqual(sock_closed, stop_reason(WsConn1)) end). t_handle_call(_) -> @@ -217,3 +229,6 @@ with_ws_conn(TestFun, Opts) -> [req, emqx_misc:merge_opts([{zone, external}], Opts)]), TestFun(WsConn). +stop_reason(WsConn) -> + emqx_ws_connection:info(stop_reason, WsConn). +