diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 012c35852..94916637d 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -148,6 +148,8 @@ info(will_msg, #channel{will_msg = WillMsg}) -> emqx_message:to_map(WillMsg); info(pub_stats, #channel{pub_stats = PubStats}) -> PubStats; +info(timers, #channel{timers = Timers}) -> + Timers; info(gc_state, #channel{gc_state = GcState}) -> maybe_apply(fun emqx_gc:info/1, GcState). @@ -635,12 +637,12 @@ handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn handle_out(puback, {PacketId, ReasonCode}, Channel) -> {ok, ?PUBACK_PACKET(PacketId, ReasonCode), inc_pub_stats(puback_out, Channel)}; -handle_out(pubrel, {PacketId, ReasonCode}, Channel) -> - {ok, ?PUBREL_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrel_out, Channel)}; - handle_out(pubrec, {PacketId, ReasonCode}, Channel) -> {ok, ?PUBREC_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrec_out, Channel)}; +handle_out(pubrel, {PacketId, ReasonCode}, Channel) -> + {ok, ?PUBREL_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrel_out, Channel)}; + handle_out(pubcomp, {PacketId, ReasonCode}, Channel) -> {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), inc_pub_stats(pubcomp_out, Channel)}; @@ -1120,6 +1122,7 @@ enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBri NL = flag(emqx_zone:ignore_loop_deliver(Zone)), SubOpts#{rap => flag(IsBridge), nl => NL}. +%% TODO: Default caps should not be returned. enrich_caps(AckProps, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}, clientinfo = #{zone := Zone}}) -> #{max_packet_size := MaxPktSize, diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 8df85bea3..7d8c62433 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -19,11 +19,6 @@ -compile(export_all). -compile(nowarn_export_all). --import(emqx_channel, - [ handle_in/2 - , handle_out/2 - ]). - -include("emqx.hrl"). -include("emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -55,427 +50,527 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. +init_per_testcase(_TestCase, Config) -> + ok = meck:new(emqx_broker, [passthrough, no_history]), + ok = meck:new(emqx_hooks, [passthrough, no_history]), + ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end), + ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end), + ok = meck:new(emqx_session, [passthrough, no_history]), + ok = meck:new(emqx_metrics, [passthrough, no_history]), + ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), + ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), + Config. + +end_per_testcase(_TestCase, Config) -> + ok = meck:unload(emqx_metrics), + ok = meck:unload(emqx_session), + ok = meck:unload(emqx_broker), + ok = meck:unload(emqx_hooks), + Config. + %%-------------------------------------------------------------------- %% Test cases for channel info/stats/caps %%-------------------------------------------------------------------- -t_chan_info(_) -> error('TODO'). +t_chan_info(_) -> + #{conn_state := connected, + clientinfo := ClientInfo + } = emqx_channel:info(channel()), + ?assertEqual(clientinfo(), ClientInfo). -t_chan_attrs(_) -> error('TODO'). +t_chan_attrs(_) -> + #{conn_state := connected} = emqx_channel:attrs(channel()). -t_chan_stats(_) -> error('TODO'). +t_chan_stats(_) -> + [] = emqx_channel:stats(channel()). -t_chan_caps(_) -> error('TODO'). +t_chan_caps(_) -> + Caps = emqx_channel:caps(channel()). -t_chan_recvd(_) -> error('TODO'). +t_chan_recvd(_) -> + _Channel = emqx_channel:recvd(10, channel()). -t_chan_sent(_) -> error('TODO'). +t_chan_sent(_) -> + _Channel = emqx_channel:sent(10, channel()). %%-------------------------------------------------------------------- %% Test cases for channel init %%-------------------------------------------------------------------- -t_chan_init(_) -> error('TODO'). +%% TODO: +t_chan_init(_) -> + Channel = channel(). %%-------------------------------------------------------------------- %% Test cases for channel handle_in %%-------------------------------------------------------------------- -t_handle_in_connect_packet(_) -> - ConnPkt = #mqtt_packet_connect{ - proto_name = <<"MQTT">>, - proto_ver = ?MQTT_PROTO_V4, - is_bridge = false, - clean_start = true, - keepalive = 30, - properties = undefined, - clientid = <<"clientid">>, - username = <<"username">>, - password = <<"passwd">> - }, - with_chan(fun(Channel) -> - ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}), - ExpectedOutput = [{enter, connected},{outgoing, ConnAck}], - {ok, Output, Channel1} = handle_in(?CONNECT_PACKET(ConnPkt), Channel), - ?assertEqual(ExpectedOutput, Output), - #{clientid := ClientId, username := Username} = emqx_channel:info(clientinfo, Channel1), - ?assertEqual(<<"clientid">>, ClientId), - ?assertEqual(<<"username">>, Username) - end). +t_handle_in_connect_packet_sucess(_) -> + ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}), + {ok, {connack, ConnAck}, Channel} + = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), channel()), + ClientInfo = emqx_channel:info(clientinfo, Channel), + ?assertMatch(#{clientid := <<"clientid">>, + username := <<"username">> + }, ClientInfo), + ?assertEqual(connected, emqx_channel:info(conn_state, Channel)). t_handle_in_unexpected_connect_packet(_) -> - error('TODO'). + Channel = emqx_channel:set_field(conn_state, connected, channel()), + Result = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel), + ?assertEqual({shutdown, protocol_error, Channel}, Result). t_handle_in_qos0_publish(_) -> - with_chan(fun(Channel) -> - Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>), - {ok, Channel1} = handle_in(Publish, Channel), - ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, Channel1)) - end). + ok = meck:expect(emqx_broker, publish, fun(_) -> ok end), + Channel = channel(#{conn_state => connected}), + Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>), + {ok, NChannel} = emqx_channel:handle_in(Publish, Channel), + ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)). t_handle_in_qos1_publish(_) -> - with_chan(fun(Channel) -> - Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>), - {ok, ?PUBACK_PACKET(1, RC), _} = handle_in(Publish, Channel), - ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)) - end). + ok = meck:expect(emqx_broker, publish, fun(_) -> ok end), + Channel = channel(#{conn_state => connected}), + Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>), + {ok, ?PUBACK_PACKET(1, RC), NChannel} = emqx_channel:handle_in(Publish, Channel), + ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)), + ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)). t_handle_in_qos2_publish(_) -> - with_chan( - fun(Channel) -> - Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), - {ok, ?PUBREC_PACKET(1, RC), Channel1} = handle_in(Publish1, Channel), - Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>), - {ok, ?PUBREC_PACKET(2, RC), Channel2} = handle_in(Publish2, Channel1), - ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)), - #{awaiting_rel := AwaitingRel} = emqx_channel:info(session, Channel2), - ?assertEqual(2, AwaitingRel) - end). + ok = meck:expect(emqx_session, publish, fun(_, _Msg, Session) -> {ok, [], Session} end), + Channel = channel(#{conn_state => connected}), + Publish = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), + {ok, ?PUBREC_PACKET(1, RC), NChannel} = emqx_channel:handle_in(Publish, Channel), + ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)), + ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)). -t_handle_in_puback(_) -> - with_chan( - fun(Channel) -> - {ok, Channel1} = handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel), - ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel1)) - end). +t_handle_in_puback_ok(_) -> + Msg = emqx_message:make(<<"t">>, <<"payload">>), + ok = meck:expect(emqx_session, puback, + fun(PacketId, Session) -> {ok, Msg, Session} end), + Channel = channel(#{conn_state => connected}), + {ok, NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel), + ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)). -t_handle_in_pubrec(_) -> - with_chan( - fun(Channel) -> - {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel1} - = handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel), - ?assertEqual(#{pubrec_in => 1, pubrel_out => 1}, emqx_channel:info(pub_stats, Channel1)) - end). +t_handle_in_puback_id_in_use(_) -> + ok = meck:expect(emqx_session, puback, + fun(_, _Session) -> + {error, ?RC_PACKET_IDENTIFIER_IN_USE} + end), + Channel = channel(#{conn_state => connected}), + PubAck = ?PUBACK_PACKET(1, ?RC_SUCCESS), + {ok, Channel} = emqx_channel:handle_in(PubAck, Channel). -t_handle_in_pubrel(_) -> - with_chan( - fun(Channel) -> - {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel1} - = handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel), - ?assertEqual(#{pubrel_in => 1, pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel1)) - end). +t_handle_in_puback_id_not_found(_) -> + ok = meck:expect(emqx_session, puback, + fun(_, _Session) -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} + end), + Channel = channel(#{conn_state => connected}), + PubAck = ?PUBACK_PACKET(1, ?RC_SUCCESS), + {ok, Channel} = emqx_channel:handle_in(PubAck, Channel). -t_handle_in_pubcomp(_) -> - with_chan( - fun(Channel) -> - {ok, Channel1} = handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel), - ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)) - end). +t_handle_in_pubrec_ok(_) -> + Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>), + ok = meck:expect(emqx_session, pubrec, fun(_, Session) -> {ok, Msg, Session} end), + Channel = channel(#{conn_state => connected}), + {ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), Channel1} + = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel), + ?assertEqual(#{pubrec_in => 1, pubrel_out => 1}, + emqx_channel:info(pub_stats, Channel1)). + +t_handle_in_pubrec_id_in_use(_) -> + ok = meck:expect(emqx_session, pubrec, + fun(_, Session) -> + {error, ?RC_PACKET_IDENTIFIER_IN_USE} + end), + Channel = channel(#{conn_state => connected}), + {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} + = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel). + +t_handle_in_pubrec_id_not_found(_) -> + ok = meck:expect(emqx_session, pubrec, + fun(_, Session) -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} + end), + Channel = channel(#{conn_state => connected}), + {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel} + = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel). + +t_handle_in_pubrel_ok(_) -> + ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end), + Channel = channel(#{conn_state => connected}), + {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel1} + = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel), + ?assertEqual(#{pubrel_in => 1, pubcomp_out => 1}, + emqx_channel:info(pub_stats, Channel1)). + +t_handle_in_pubrel_not_found_error(_) -> + ok = meck:expect(emqx_session, pubrel, + fun(_PacketId, _Session) -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} + end), + Channel = channel(#{conn_state => connected}), + {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel} + = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel). + +t_handle_in_pubcomp_ok(_) -> + ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end), + Channel = channel(#{conn_state => connected}), + {ok, Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel), + ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)). + +t_handle_in_pubcomp_not_found_error(_) -> + ok = meck:expect(emqx_session, pubcomp, + fun(_PacketId, _Session) -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} + end), + Channel = channel(#{conn_state => connected}), + {ok, Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel), + ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)). t_handle_in_subscribe(_) -> - with_chan( - fun(Channel) -> - TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], - {ok, ?SUBACK_PACKET(10, [?QOS_0]), Channel1} - = handle_in(?SUBSCRIBE_PACKET(10, #{}, TopicFilters), Channel), - #{subscriptions := Subscriptions} - = emqx_channel:info(session, Channel1), - ?assertEqual(maps:from_list(TopicFilters), Subscriptions) - end). + ok = meck:expect(emqx_session, subscribe, + fun(_, _, _, Session) -> + {ok, Session} + end), + Channel = channel(#{conn_state => connected}), + TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], + Subscribe = ?SUBSCRIBE_PACKET(1, #{}, TopicFilters), + {ok, ?SUBACK_PACKET(1, [?QOS_0]), _} = emqx_channel:handle_in(Subscribe, Channel). t_handle_in_unsubscribe(_) -> - with_chan( - fun(Channel) -> - {ok, ?UNSUBACK_PACKET(11), Channel} - = handle_in(?UNSUBSCRIBE_PACKET(11, #{}, [<<"+">>]), Channel) - end). + ok = meck:expect(emqx_session, unsubscribe, + fun(_, _, Session) -> + {ok, Session} + end), + Channel = channel(#{conn_state => connected}), + UnsubPkt = ?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]), + {ok, ?UNSUBACK_PACKET(1), _} = emqx_channel:handle_in(UnsubPkt, Channel). t_handle_in_pingreq(_) -> - with_chan(fun(Channel) -> - {ok, ?PACKET(?PINGRESP), Channel} = handle_in(?PACKET(?PINGREQ), Channel) - end). + {ok, ?PACKET(?PINGRESP), _Channel} + = emqx_channel:handle_in(?PACKET(?PINGREQ), channel()). t_handle_in_disconnect(_) -> - with_chan(fun(Channel) -> - {stop, {shutdown, normal}, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel), - ?assertEqual(undefined, emqx_channel:info(will_msg, Channel1)) - end). + Channel = channel(#{conn_state => connected}), + {shutdown, normal, Channel1} = emqx_channel:handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel), + ?assertEqual(undefined, emqx_channel:info(will_msg, Channel1)). t_handle_in_auth(_) -> - with_chan(fun(Channel) -> - Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR), - {stop, {shutdown, implementation_specific_error}, Packet, Channel} = handle_in(?AUTH_PACKET(), Channel) - end). + Channel = channel(#{conn_state => connected}), + Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR), + {shutdown, implementation_specific_error, Packet, Channel} + = emqx_channel:handle_in(?AUTH_PACKET(), Channel). t_handle_in_frame_error(_) -> - with_chan(fun(Channel) -> error('TODO') end). + IdleChannel = channel(#{conn_state => idle}), + {shutdown, frame_too_large, _} + = emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel), + ConnectingChan = channel(#{conn_state => connecting}), + {shutdown, frame_too_large, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), _} + = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan), + ConnectedChan = channel(#{conn_state => connected}), + {shutdown, frame_too_large, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _} + = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan), + DisconnectedChan = channel(#{conn_state => disconnected}), + {ok, DisconnectedChan} + = emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan). t_handle_in_expected_packet(_) -> - with_chan(fun(Channel) -> error('TODO') end). + {ok, _Chan} = emqx_channel:handle_in(packet, channel()). t_process_connect(_) -> - with_chan(fun(Channel) -> error('TODO') end). + ok = meck:expect(emqx_cm, open_session, + fun(true, _ClientInfo, _ConnInfo) -> + {ok, #{session => session(), present => false}} + end), + ConnPkt = connpkt(), + {ok, ?CONNACK_PACKET(?RC_SUCCESS), ConnPkt, _} + = emqx_channel:process_connect(ConnPkt, channel()). t_handle_publish(_) -> - with_chan(fun(Channel) -> error('TODO') end). + Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), + {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _Channel} + = emqx_channel:handle_publish(Publish, channel()). -t_process_publish(_) -> - with_chan(fun(Channel) -> error('TODO') end). +t_process_publish_qos1(_) -> + Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>), + {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _Channel} + = emqx_channel:process_publish(1, Msg, channel()). t_process_subscribe(_) -> - with_chan(fun(Channel) -> error('TODO') end). + TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], + {[?RC_SUCCESS], _Channel} = emqx_channel:process_subscribe(TopicFilters, channel()). t_process_unsubscribe(_) -> - with_chan(fun(Channel) -> error('TODO') end). + ok = meck:expect(emqx_session, unsubscribe, fun(_, _, Session) -> {ok, Session} end), + TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], + {[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, channel()). %%-------------------------------------------------------------------- %% Test cases for handle_out %%-------------------------------------------------------------------- t_handle_out_delivers(_) -> - with_chan(fun(Channel) -> - TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS#{qos => ?QOS_2}}], - {ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel1} - = handle_in(?SUBSCRIBE_PACKET(1, #{}, TopicFilters), Channel), - Msg0 = emqx_message:make(<<"clientx">>, ?QOS_0, <<"t0">>, <<"qos0">>), - Msg1 = emqx_message:make(<<"clientx">>, ?QOS_1, <<"t1">>, <<"qos1">>), - Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}], - {ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, Channel1), - ?assertEqual([?QOS_0, ?QOS_1], [emqx_packet:qos(Pkt)|| Pkt <- Packets]) - end). + ok = emqx_meck:expect(emqx_session, deliver, + fun(Delivers, Session) -> + Msgs = [Msg || {deliver, _, Msg} <- Delivers], + [{publish, PacketId, Msg} || {PacketId, Msg} <- lists:zip(lists:seq(1, length(Msgs)), Msgs)] + end), + Msg0 = emqx_message:make(test, ?QOS_1, <<"t1">>, <<"qos1">>), + Msg1 = emqx_message:make(test, ?QOS_2, <<"t2">>, <<"qos2">>), + Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}], + {ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, channel()), + ?assertEqual([?QOS_1, ?QOS_2], [emqx_packet:qos(Pkt)|| Pkt <- Packets]). -t_handle_out_connack_sucess(_) -> - ConnPkt = #mqtt_packet_connect{ - proto_name = <<"MQTT">>, - proto_ver = ?MQTT_PROTO_V4, - clean_start = true, - properties = #{}, - clientid = <<"clientid">> - }, - with_chan(fun(Channel) -> - {ok, [{enter, connected},{outgoing, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}], _Chan} - = handle_out({connack, ?RC_SUCCESS, 0, ConnPkt}, Channel), - {stop, {shutdown, not_authorized}, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} - = handle_out({connack, ?RC_NOT_AUTHORIZED, ConnPkt}, Channel) - end). - -t_handle_out_connack_failure(_) -> - with_chan(fun(Channel) -> error('TODO') end). +t_handle_out_publishes(_) -> + Channel = channel(#{conn_state => connected}), + Pub0 = {publish, undefined, emqx_message:make(<<"t">>, <<"qos0">>)}, + Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)}, + {ok, {outgoing, Packets}, NChannel} + = emqx_channel:handle_out({publish, [Pub0, Pub1]}, Channel), + ?assertEqual(2, length(Packets)), + ?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, NChannel)). t_handle_out_publish(_) -> - with_chan(fun(Channel) -> - Pub0 = {publish, undefined, emqx_message:make(<<"t">>, <<"qos0">>)}, - Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)}, - {ok, ?PUBLISH_PACKET(?QOS_0), Channel} = handle_out(Pub0, Channel), - {ok, ?PUBLISH_PACKET(?QOS_1), Channel} = handle_out(Pub1, Channel), - {ok, {outgoing, Packets}, Channel1} = handle_out({publish, [Pub0, Pub1]}, Channel), - ?assertEqual(2, length(Packets)), - ?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, Channel1)) - end). + Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>), + {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, <<"payload">>), _Channel} + = emqx_channel:handle_out({publish, 1, Msg}, channel()). + +t_handle_out_publish_nl(_) -> + ClientInfo = clientinfo(#{clientid => <<"clientid">>}), + Channel = channel(#{clientinfo => ClientInfo}), + Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>), + Publish = {publish, 1, emqx_message:set_flag(nl, Msg)}, + {ok, Channel} = emqx_channel:handle_out(Publish, Channel). + +t_handle_out_connack_sucess(_) -> + Channel = channel(#{conn_state => connected}), + {ok, {connack, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}, _Chan} + = emqx_channel:handle_out({connack, ?RC_SUCCESS, 0, connpkt()}, Channel). + +t_handle_out_connack_failure(_) -> + {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} + = emqx_channel:handle_out({connack, ?RC_NOT_AUTHORIZED, connpkt()}, channel()). t_handle_out_puback(_) -> - with_chan(fun(Channel) -> - {ok, Channel} = handle_out({puberr, ?RC_NOT_AUTHORIZED}, Channel), - {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Channel1} - = handle_out({puback, 1, ?RC_SUCCESS}, Channel), - ?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, Channel1)) - end). + Channel = channel(#{conn_state => connected}), + {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), NChannel} + = emqx_channel:handle_out(puback, {1, ?RC_SUCCESS}, Channel), + ?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, NChannel)). t_handle_out_pubrec(_) -> - with_chan(fun(Channel) -> - {ok, ?PUBREC_PACKET(4, ?RC_SUCCESS), Channel1} - = handle_out({pubrec, 4, ?RC_SUCCESS}, Channel), - ?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, Channel1)) - end). + Channel = channel(#{conn_state => connected}), + {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), NChannel} + = emqx_channel:handle_out({pubrec, 1, ?RC_SUCCESS}, Channel), + ?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)). t_handle_out_pubrel(_) -> - with_chan(fun(Channel) -> - {ok, ?PUBREL_PACKET(2), Channel1} - = handle_out({pubrel, 2, ?RC_SUCCESS}, Channel), - {ok, ?PUBREL_PACKET(3, ?RC_SUCCESS), Channel2} - = handle_out({pubrel, 3, ?RC_SUCCESS}, Channel1), - ?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2)) - end). + Channel = channel(#{conn_state => connected}), + {ok, ?PUBREL_PACKET(1), Channel1} + = emqx_channel:handle_out({pubrel, 1, ?RC_SUCCESS}, Channel), + {ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), Channel2} + = emqx_channel:handle_out({pubrel, 2, ?RC_SUCCESS}, Channel1), + ?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2)). t_handle_out_pubcomp(_) -> - with_chan(fun(Channel) -> - {ok, ?PUBCOMP_PACKET(5, ?RC_SUCCESS), Channel1} - = handle_out({pubcomp, 5, ?RC_SUCCESS}, Channel), - ?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel1)) - end). + {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel} + = emqx_channel:handle_out({pubcomp, 2, ?RC_SUCCESS}, channel()), + ?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel)). t_handle_out_suback(_) -> - with_chan(fun(Channel) -> - {ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel} - = handle_out({suback, 1, [?QOS_2]}, Channel) - end). + {ok, ?SUBACK_PACKET(1, [?QOS_2]), _Channel} + = emqx_channel:handle_out({suback, 1, [?QOS_2]}, channel()). t_handle_out_unsuback(_) -> - with_chan(fun(Channel) -> - {ok, ?UNSUBACK_PACKET(1), Channel} - = handle_out({unsuback, 1, [?RC_SUCCESS]}, Channel) - end). + {ok, ?UNSUBACK_PACKET(1, [?RC_SUCCESS]), _Channel} + = emqx_channel:handle_out({unsuback, 1, [?RC_SUCCESS]}, channel()). t_handle_out_disconnect(_) -> - with_chan( - fun(Channel) -> - handle_out({disconnect, ?RC_SUCCESS}, Channel) - end). + {shutdown, normal, ?DISCONNECT_PACKET(?RC_SUCCESS), _Chan} + = emqx_channel:handle_out({disconnect, ?RC_SUCCESS}, channel()). t_handle_out_unexpected(_) -> - with_chan(fun(Channel) -> - handle_out({disconnect, ?RC_SUCCESS}, Channel) - end). + {ok, _Channel} = emqx_channel:handle_out(unexpected, <<"data">>, channel()). %%-------------------------------------------------------------------- %% Test cases for handle_call %%-------------------------------------------------------------------- t_handle_call_kick(_) -> - error('TODO'). + {shutdown, kicked, ok, _Chan} = emqx_channel:handle_call(kick, channel()). t_handle_call_discard(_) -> - error('TODO'). + Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), + {shutdown, discarded, ok, Packet, _Channel} + = emqx_channel:handle_call(discard, channel()). -t_handle_call_takeover(_) -> - error('TODO'). +t_handle_call_takeover_begin(_) -> + {reply, undefined, _Channel} + = emqx_channel:handle_call({takeover, 'begin'}, channel()). + +t_handle_call_takeover_end(_) -> + ok = meck:expect(emqx_session, takeover, fun(_) -> ok end), + {shutdown, takeovered, [], _Channel} + = emqx_channel:handle_call({takeover, 'end'}, channel()). t_handle_call_unexpected(_) -> - error('TODO'). + {reply, ignored, _Channel} = emqx_channel:handle_call(unexpected_req, channel()). %%-------------------------------------------------------------------- %% Test cases for handle_info %%-------------------------------------------------------------------- t_handle_info_subscribe(_) -> - error('TODO'). + ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end), + {ok, _Chan} = emqx_channel:handle_info({subscribe, topic_filters()}, channel()). t_handle_info_unsubscribe(_) -> - error('TODO'). + ok = meck:expect(emqx_session, unsubscribe, fun(_, _, Session) -> {ok, Session} end), + {ok, _Chan} = emqx_channel:handle_info({unsubscribe, topic_filters()}, channel()). t_handle_info_sock_closed(_) -> - error('TODO'). + {ok, _Chan} = emqx_channel:handle_out({sock_closed, reason}, channel(#{conn_state => disconnected})). %%-------------------------------------------------------------------- %% Test cases for handle_timeout %%-------------------------------------------------------------------- t_handle_timeout_emit_stats(_) -> - with_chan(fun(Channel) -> 'TODO' end). + {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {emit_stats, []}, channel()). t_handle_timeout_keepalive(_) -> - with_chan(fun(Channel) -> 'TODO' end). + {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {keepalive, 10}, channel()). t_handle_timeout_retry_delivery(_) -> - with_chan(fun(Channel) -> 'TODO' end). + {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), retry_delivery, channel()). t_handle_timeout_expire_awaiting_rel(_) -> - with_chan(fun(Channel) -> 'TODO' end). + {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), expire_awaiting_rel, channel()). t_handle_timeout_expire_session(_) -> - with_chan(fun(Channel) -> 'TODO' end). + {shutdown, expired, _Chan} = emqx_channel:handle_timeout(make_ref(), expire_awaiting_rel, channel()). t_handle_timeout_will_message(_) -> - with_chan(fun(Channel) -> 'TODO' end). - -%%-------------------------------------------------------------------- -%% Test cases for ensure_timer -%%-------------------------------------------------------------------- - -t_ensure_timer(_) -> - with_chan(fun(Channel) -> error('TODO') end). - -t_reset_timer(_) -> - with_chan(fun(Channel) -> error('TODO') end). - -t_alive_timer(_) -> - with_chan(fun(Channel) -> error('TODO') end). - -t_retry_timer(_) -> - with_chan(fun(Channel) -> error('TODO') end). - -t_await_timer(_) -> - with_chan(fun(Channel) -> error('TODO') end). - -t_expire_timer(_) -> - with_chan(fun(Channel) -> error('TODO') end). - -t_will_timer(_) -> - with_chan(fun(Channel) -> error('TODO') end). + {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), will_message, channel()). %%-------------------------------------------------------------------- %% Test cases for internal functions %%-------------------------------------------------------------------- t_enrich_conninfo(_) -> - with_chan(fun(Channel) -> error('TODO') end). + {ok, _Chan} = emqx_channel:enrich_conninfo(connpkt(), channel()). t_enrich_client(_) -> - with_chan(fun(Channel) -> error('TODO') end). + {ok, _ConnPkt, _Chan} = emqx_channel:enrich_client(connpkt(), channel()). t_check_banned(_) -> - with_chan(fun(Channel) -> error('TODO') end). + ok = emqx_channel:check_banned(connpkt(), channel()). t_check_flapping(_) -> - with_chan(fun(Channel) -> error('TODO') end). + ok = emqx_channel:check_flapping(connpkt(), channel()). t_auth_connect(_) -> - with_chan(fun(Channel) -> error('TODO') end). + ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {ok, #{}} end), + {ok, _Chan} = emqx_channel:auth_connect(connpkt(), channel()). t_process_alias(_) -> - with_chan(fun(Channel) -> error('TODO') end). + Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}}, + Channel = emqx_channel:set_field(topic_aliases, #{1 => <<"t">>}, channel()), + {ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan} + = emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel). t_check_pub_acl(_) -> - with_chan(fun(Channel) -> error('TODO') end). + Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>), + ok = emqx_channel:check_pub_acl(Publish, channel()). t_check_pub_alias(_) -> - with_chan(fun(Channel) -> error('TODO') end). + Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}}, + ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, channel()). t_check_subscribe(_) -> - with_chan(fun(Channel) -> error('TODO') end). - -t_check_sub_acl(_) -> - with_chan(fun(Channel) -> error('TODO') end). - -t_check_sub_caps(_) -> - with_chan(fun(Channel) -> error('TODO') end). - -t_enrich_subid(_) -> - with_chan(fun(Channel) -> error('TODO') end). - -t_enrich_subopts(_) -> - with_chan(fun(Channel) -> error('TODO') end). + ok = emqx_channel:check_subscribe(<<"t">>, ?DEFAULT_SUBOPTS, channel()). t_enrich_caps(_) -> - with_chan(fun(Channel) -> error('TODO') end). + ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]), + ok = meck:expect(emqx_mqtt_caps, get_caps, + fun(_Zone) -> + #{max_packet_size => 1024, + max_qos_allowed => ?QOS_2, + retain_available => true, + max_topic_alias => 10, + shared_subscription => true, + wildcard_subscription => true + } + end), + AckProps = emqx_channel:enrich_caps(#{}, channel()), + ?assertMatch(#{'Retain-Available' := 1, + 'Maximum-Packet-Size' := 1024, + 'Topic-Alias-Maximum' := 10, + 'Wildcard-Subscription-Available' := 1, + 'Subscription-Identifier-Available' := 1, + 'Shared-Subscription-Available' := 1, + 'Maximum-QoS' := ?QOS_2 + }, AckProps), + ok = meck:unload(emqx_mqtt_caps). %%-------------------------------------------------------------------- %% Test cases for terminate %%-------------------------------------------------------------------- t_terminate(_) -> - with_chan( - fun(Channel) -> - 'TODO' - end). + ok = emqx_channel:terminate(normal, channel()), + ok = emqx_channel:terminate(sock_error, channel(#{conn_state => connected})), + ok = emqx_channel:terminate({shutdown, kicked}, channel(#{conn_state => connected})). %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- -with_connected_channel(TestFun) -> - with_chan( - fun(Channel) -> - TestFun(emqx_channel:set_field(conn_state, connected, Channel)) - end). +channel() -> channel(#{}). +channel(InitFields) -> + maps:fold(fun(Field, Value, Channel) -> + emqx_channel:set_field(Field, Value, Channel) + end, default_channel(), InitFields). -with_chan(TestFun) -> - with_chan(#{}, TestFun). +default_channel() -> + Channel = emqx_channel:init(?DEFAULT_CONNINFO, [{zone, zone}]), + Channel1 = emqx_channel:set_field(conn_state, connected, Channel), + emqx_channel:set_field(clientinfo, clientinfo(), Channel1). -with_chan(ConnInfo, TestFun) -> - ConnInfo1 = maps:merge(?DEFAULT_CONNINFO, ConnInfo), - ClientInfo = #{zone => <<"external">>, - protocol => mqtt, - peerhost => {127,0,0,1}, - clientid => <<"clientid">>, - username => <<"username">>, - peercert => undefined, - is_bridge => false, - is_superuser => false, - mountpoint => undefined - }, - Channel = emqx_channel:init(ConnInfo1, [{zone, testing}]), - Session = emqx_session:init(ClientInfo, ConnInfo1), - Channel1 = emqx_channel:set_field(clientinfo, ClientInfo, Channel), - TestFun(emqx_channel:set_field(session, Session, Channel1)). +clientinfo() -> clientinfo(#{}). +clientinfo(InitProps) -> + maps:merge(#{zone => zone, + protocol => mqtt, + peerhost => {127,0,0,1}, + clientid => <<"clientid">>, + username => <<"username">>, + is_superuser => false, + peercert => undefined, + mountpoint => undefined + }, InitProps). + +topic_filters() -> + [{<<"+">>, ?DEFAULT_SUBOPTS}, {<<"#">>, ?DEFAULT_SUBOPTS}]. + +connpkt() -> + #mqtt_packet_connect{ + proto_name = <<"MQTT">>, + proto_ver = ?MQTT_PROTO_V4, + is_bridge = false, + clean_start = true, + keepalive = 30, + properties = undefined, + clientid = <<"clientid">>, + username = <<"username">>, + password = <<"passwd">> + }. + +session() -> session(#{}). +session(InitFields) when is_map(InitFields) -> + maps:fold(fun(Field, Value, Session) -> + emqx_session:set_field(Field, Value, Session) + end, + emqx_session:init(#{zone => zone}, #{receive_maximum => 0}), + InitFields).