Add test cases for emqx_channel module
This commit is contained in:
parent
aecda09b9a
commit
26eab630b3
|
@ -148,6 +148,8 @@ info(will_msg, #channel{will_msg = WillMsg}) ->
|
||||||
emqx_message:to_map(WillMsg);
|
emqx_message:to_map(WillMsg);
|
||||||
info(pub_stats, #channel{pub_stats = PubStats}) ->
|
info(pub_stats, #channel{pub_stats = PubStats}) ->
|
||||||
PubStats;
|
PubStats;
|
||||||
|
info(timers, #channel{timers = Timers}) ->
|
||||||
|
Timers;
|
||||||
info(gc_state, #channel{gc_state = GcState}) ->
|
info(gc_state, #channel{gc_state = GcState}) ->
|
||||||
maybe_apply(fun emqx_gc:info/1, GcState).
|
maybe_apply(fun emqx_gc:info/1, GcState).
|
||||||
|
|
||||||
|
@ -635,12 +637,12 @@ handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn
|
||||||
handle_out(puback, {PacketId, ReasonCode}, Channel) ->
|
handle_out(puback, {PacketId, ReasonCode}, Channel) ->
|
||||||
{ok, ?PUBACK_PACKET(PacketId, ReasonCode), inc_pub_stats(puback_out, Channel)};
|
{ok, ?PUBACK_PACKET(PacketId, ReasonCode), inc_pub_stats(puback_out, Channel)};
|
||||||
|
|
||||||
handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
|
|
||||||
{ok, ?PUBREL_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrel_out, Channel)};
|
|
||||||
|
|
||||||
handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
|
handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
|
||||||
{ok, ?PUBREC_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrec_out, Channel)};
|
{ok, ?PUBREC_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrec_out, Channel)};
|
||||||
|
|
||||||
|
handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
|
||||||
|
{ok, ?PUBREL_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrel_out, Channel)};
|
||||||
|
|
||||||
handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
|
handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
|
||||||
{ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), inc_pub_stats(pubcomp_out, Channel)};
|
{ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), inc_pub_stats(pubcomp_out, Channel)};
|
||||||
|
|
||||||
|
@ -1120,6 +1122,7 @@ enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBri
|
||||||
NL = flag(emqx_zone:ignore_loop_deliver(Zone)),
|
NL = flag(emqx_zone:ignore_loop_deliver(Zone)),
|
||||||
SubOpts#{rap => flag(IsBridge), nl => NL}.
|
SubOpts#{rap => flag(IsBridge), nl => NL}.
|
||||||
|
|
||||||
|
%% TODO: Default caps should not be returned.
|
||||||
enrich_caps(AckProps, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5},
|
enrich_caps(AckProps, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5},
|
||||||
clientinfo = #{zone := Zone}}) ->
|
clientinfo = #{zone := Zone}}) ->
|
||||||
#{max_packet_size := MaxPktSize,
|
#{max_packet_size := MaxPktSize,
|
||||||
|
|
|
@ -19,11 +19,6 @@
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-import(emqx_channel,
|
|
||||||
[ handle_in/2
|
|
||||||
, handle_out/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
@ -55,427 +50,527 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
ok = meck:new(emqx_broker, [passthrough, no_history]),
|
||||||
|
ok = meck:new(emqx_hooks, [passthrough, no_history]),
|
||||||
|
ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
|
||||||
|
ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end),
|
||||||
|
ok = meck:new(emqx_session, [passthrough, no_history]),
|
||||||
|
ok = meck:new(emqx_metrics, [passthrough, no_history]),
|
||||||
|
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
||||||
|
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_TestCase, Config) ->
|
||||||
|
ok = meck:unload(emqx_metrics),
|
||||||
|
ok = meck:unload(emqx_session),
|
||||||
|
ok = meck:unload(emqx_broker),
|
||||||
|
ok = meck:unload(emqx_hooks),
|
||||||
|
Config.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for channel info/stats/caps
|
%% Test cases for channel info/stats/caps
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_chan_info(_) -> error('TODO').
|
t_chan_info(_) ->
|
||||||
|
#{conn_state := connected,
|
||||||
|
clientinfo := ClientInfo
|
||||||
|
} = emqx_channel:info(channel()),
|
||||||
|
?assertEqual(clientinfo(), ClientInfo).
|
||||||
|
|
||||||
t_chan_attrs(_) -> error('TODO').
|
t_chan_attrs(_) ->
|
||||||
|
#{conn_state := connected} = emqx_channel:attrs(channel()).
|
||||||
|
|
||||||
t_chan_stats(_) -> error('TODO').
|
t_chan_stats(_) ->
|
||||||
|
[] = emqx_channel:stats(channel()).
|
||||||
|
|
||||||
t_chan_caps(_) -> error('TODO').
|
t_chan_caps(_) ->
|
||||||
|
Caps = emqx_channel:caps(channel()).
|
||||||
|
|
||||||
t_chan_recvd(_) -> error('TODO').
|
t_chan_recvd(_) ->
|
||||||
|
_Channel = emqx_channel:recvd(10, channel()).
|
||||||
|
|
||||||
t_chan_sent(_) -> error('TODO').
|
t_chan_sent(_) ->
|
||||||
|
_Channel = emqx_channel:sent(10, channel()).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for channel init
|
%% Test cases for channel init
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_chan_init(_) -> error('TODO').
|
%% TODO:
|
||||||
|
t_chan_init(_) ->
|
||||||
|
Channel = channel().
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for channel handle_in
|
%% Test cases for channel handle_in
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_handle_in_connect_packet(_) ->
|
t_handle_in_connect_packet_sucess(_) ->
|
||||||
ConnPkt = #mqtt_packet_connect{
|
ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}),
|
||||||
proto_name = <<"MQTT">>,
|
{ok, {connack, ConnAck}, Channel}
|
||||||
proto_ver = ?MQTT_PROTO_V4,
|
= emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), channel()),
|
||||||
is_bridge = false,
|
ClientInfo = emqx_channel:info(clientinfo, Channel),
|
||||||
clean_start = true,
|
?assertMatch(#{clientid := <<"clientid">>,
|
||||||
keepalive = 30,
|
username := <<"username">>
|
||||||
properties = undefined,
|
}, ClientInfo),
|
||||||
clientid = <<"clientid">>,
|
?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
|
||||||
username = <<"username">>,
|
|
||||||
password = <<"passwd">>
|
|
||||||
},
|
|
||||||
with_chan(fun(Channel) ->
|
|
||||||
ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}),
|
|
||||||
ExpectedOutput = [{enter, connected},{outgoing, ConnAck}],
|
|
||||||
{ok, Output, Channel1} = handle_in(?CONNECT_PACKET(ConnPkt), Channel),
|
|
||||||
?assertEqual(ExpectedOutput, Output),
|
|
||||||
#{clientid := ClientId, username := Username} = emqx_channel:info(clientinfo, Channel1),
|
|
||||||
?assertEqual(<<"clientid">>, ClientId),
|
|
||||||
?assertEqual(<<"username">>, Username)
|
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_in_unexpected_connect_packet(_) ->
|
t_handle_in_unexpected_connect_packet(_) ->
|
||||||
error('TODO').
|
Channel = emqx_channel:set_field(conn_state, connected, channel()),
|
||||||
|
Result = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel),
|
||||||
|
?assertEqual({shutdown, protocol_error, Channel}, Result).
|
||||||
|
|
||||||
t_handle_in_qos0_publish(_) ->
|
t_handle_in_qos0_publish(_) ->
|
||||||
with_chan(fun(Channel) ->
|
ok = meck:expect(emqx_broker, publish, fun(_) -> ok end),
|
||||||
Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
|
Channel = channel(#{conn_state => connected}),
|
||||||
{ok, Channel1} = handle_in(Publish, Channel),
|
Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
|
||||||
?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, Channel1))
|
{ok, NChannel} = emqx_channel:handle_in(Publish, Channel),
|
||||||
end).
|
?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)).
|
||||||
|
|
||||||
t_handle_in_qos1_publish(_) ->
|
t_handle_in_qos1_publish(_) ->
|
||||||
with_chan(fun(Channel) ->
|
ok = meck:expect(emqx_broker, publish, fun(_) -> ok end),
|
||||||
Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
|
Channel = channel(#{conn_state => connected}),
|
||||||
{ok, ?PUBACK_PACKET(1, RC), _} = handle_in(Publish, Channel),
|
Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
|
||||||
?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS))
|
{ok, ?PUBACK_PACKET(1, RC), NChannel} = emqx_channel:handle_in(Publish, Channel),
|
||||||
end).
|
?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)),
|
||||||
|
?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)).
|
||||||
|
|
||||||
t_handle_in_qos2_publish(_) ->
|
t_handle_in_qos2_publish(_) ->
|
||||||
with_chan(
|
ok = meck:expect(emqx_session, publish, fun(_, _Msg, Session) -> {ok, [], Session} end),
|
||||||
fun(Channel) ->
|
Channel = channel(#{conn_state => connected}),
|
||||||
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
Publish = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
||||||
{ok, ?PUBREC_PACKET(1, RC), Channel1} = handle_in(Publish1, Channel),
|
{ok, ?PUBREC_PACKET(1, RC), NChannel} = emqx_channel:handle_in(Publish, Channel),
|
||||||
Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>),
|
?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)),
|
||||||
{ok, ?PUBREC_PACKET(2, RC), Channel2} = handle_in(Publish2, Channel1),
|
?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)).
|
||||||
?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)),
|
|
||||||
#{awaiting_rel := AwaitingRel} = emqx_channel:info(session, Channel2),
|
|
||||||
?assertEqual(2, AwaitingRel)
|
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_in_puback(_) ->
|
t_handle_in_puback_ok(_) ->
|
||||||
with_chan(
|
Msg = emqx_message:make(<<"t">>, <<"payload">>),
|
||||||
fun(Channel) ->
|
ok = meck:expect(emqx_session, puback,
|
||||||
{ok, Channel1} = handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel),
|
fun(PacketId, Session) -> {ok, Msg, Session} end),
|
||||||
?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel1))
|
Channel = channel(#{conn_state => connected}),
|
||||||
end).
|
{ok, NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel),
|
||||||
|
?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)).
|
||||||
|
|
||||||
t_handle_in_pubrec(_) ->
|
t_handle_in_puback_id_in_use(_) ->
|
||||||
with_chan(
|
ok = meck:expect(emqx_session, puback,
|
||||||
fun(Channel) ->
|
fun(_, _Session) ->
|
||||||
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel1}
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
|
||||||
= handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel),
|
end),
|
||||||
?assertEqual(#{pubrec_in => 1, pubrel_out => 1}, emqx_channel:info(pub_stats, Channel1))
|
Channel = channel(#{conn_state => connected}),
|
||||||
end).
|
PubAck = ?PUBACK_PACKET(1, ?RC_SUCCESS),
|
||||||
|
{ok, Channel} = emqx_channel:handle_in(PubAck, Channel).
|
||||||
|
|
||||||
t_handle_in_pubrel(_) ->
|
t_handle_in_puback_id_not_found(_) ->
|
||||||
with_chan(
|
ok = meck:expect(emqx_session, puback,
|
||||||
fun(Channel) ->
|
fun(_, _Session) ->
|
||||||
{ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel1}
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
||||||
= handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel),
|
end),
|
||||||
?assertEqual(#{pubrel_in => 1, pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel1))
|
Channel = channel(#{conn_state => connected}),
|
||||||
end).
|
PubAck = ?PUBACK_PACKET(1, ?RC_SUCCESS),
|
||||||
|
{ok, Channel} = emqx_channel:handle_in(PubAck, Channel).
|
||||||
|
|
||||||
t_handle_in_pubcomp(_) ->
|
t_handle_in_pubrec_ok(_) ->
|
||||||
with_chan(
|
Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>),
|
||||||
fun(Channel) ->
|
ok = meck:expect(emqx_session, pubrec, fun(_, Session) -> {ok, Msg, Session} end),
|
||||||
{ok, Channel1} = handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel),
|
Channel = channel(#{conn_state => connected}),
|
||||||
?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1))
|
{ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), Channel1}
|
||||||
end).
|
= emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel),
|
||||||
|
?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
|
||||||
|
emqx_channel:info(pub_stats, Channel1)).
|
||||||
|
|
||||||
|
t_handle_in_pubrec_id_in_use(_) ->
|
||||||
|
ok = meck:expect(emqx_session, pubrec,
|
||||||
|
fun(_, Session) ->
|
||||||
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
|
||||||
|
end),
|
||||||
|
Channel = channel(#{conn_state => connected}),
|
||||||
|
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel}
|
||||||
|
= emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel).
|
||||||
|
|
||||||
|
t_handle_in_pubrec_id_not_found(_) ->
|
||||||
|
ok = meck:expect(emqx_session, pubrec,
|
||||||
|
fun(_, Session) ->
|
||||||
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
||||||
|
end),
|
||||||
|
Channel = channel(#{conn_state => connected}),
|
||||||
|
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel}
|
||||||
|
= emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel).
|
||||||
|
|
||||||
|
t_handle_in_pubrel_ok(_) ->
|
||||||
|
ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end),
|
||||||
|
Channel = channel(#{conn_state => connected}),
|
||||||
|
{ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel1}
|
||||||
|
= emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel),
|
||||||
|
?assertEqual(#{pubrel_in => 1, pubcomp_out => 1},
|
||||||
|
emqx_channel:info(pub_stats, Channel1)).
|
||||||
|
|
||||||
|
t_handle_in_pubrel_not_found_error(_) ->
|
||||||
|
ok = meck:expect(emqx_session, pubrel,
|
||||||
|
fun(_PacketId, _Session) ->
|
||||||
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
||||||
|
end),
|
||||||
|
Channel = channel(#{conn_state => connected}),
|
||||||
|
{ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel}
|
||||||
|
= emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel).
|
||||||
|
|
||||||
|
t_handle_in_pubcomp_ok(_) ->
|
||||||
|
ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end),
|
||||||
|
Channel = channel(#{conn_state => connected}),
|
||||||
|
{ok, Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel),
|
||||||
|
?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)).
|
||||||
|
|
||||||
|
t_handle_in_pubcomp_not_found_error(_) ->
|
||||||
|
ok = meck:expect(emqx_session, pubcomp,
|
||||||
|
fun(_PacketId, _Session) ->
|
||||||
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
||||||
|
end),
|
||||||
|
Channel = channel(#{conn_state => connected}),
|
||||||
|
{ok, Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel),
|
||||||
|
?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)).
|
||||||
|
|
||||||
t_handle_in_subscribe(_) ->
|
t_handle_in_subscribe(_) ->
|
||||||
with_chan(
|
ok = meck:expect(emqx_session, subscribe,
|
||||||
fun(Channel) ->
|
fun(_, _, _, Session) ->
|
||||||
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
|
{ok, Session}
|
||||||
{ok, ?SUBACK_PACKET(10, [?QOS_0]), Channel1}
|
end),
|
||||||
= handle_in(?SUBSCRIBE_PACKET(10, #{}, TopicFilters), Channel),
|
Channel = channel(#{conn_state => connected}),
|
||||||
#{subscriptions := Subscriptions}
|
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
|
||||||
= emqx_channel:info(session, Channel1),
|
Subscribe = ?SUBSCRIBE_PACKET(1, #{}, TopicFilters),
|
||||||
?assertEqual(maps:from_list(TopicFilters), Subscriptions)
|
{ok, ?SUBACK_PACKET(1, [?QOS_0]), _} = emqx_channel:handle_in(Subscribe, Channel).
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_in_unsubscribe(_) ->
|
t_handle_in_unsubscribe(_) ->
|
||||||
with_chan(
|
ok = meck:expect(emqx_session, unsubscribe,
|
||||||
fun(Channel) ->
|
fun(_, _, Session) ->
|
||||||
{ok, ?UNSUBACK_PACKET(11), Channel}
|
{ok, Session}
|
||||||
= handle_in(?UNSUBSCRIBE_PACKET(11, #{}, [<<"+">>]), Channel)
|
end),
|
||||||
end).
|
Channel = channel(#{conn_state => connected}),
|
||||||
|
UnsubPkt = ?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]),
|
||||||
|
{ok, ?UNSUBACK_PACKET(1), _} = emqx_channel:handle_in(UnsubPkt, Channel).
|
||||||
|
|
||||||
t_handle_in_pingreq(_) ->
|
t_handle_in_pingreq(_) ->
|
||||||
with_chan(fun(Channel) ->
|
{ok, ?PACKET(?PINGRESP), _Channel}
|
||||||
{ok, ?PACKET(?PINGRESP), Channel} = handle_in(?PACKET(?PINGREQ), Channel)
|
= emqx_channel:handle_in(?PACKET(?PINGREQ), channel()).
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_in_disconnect(_) ->
|
t_handle_in_disconnect(_) ->
|
||||||
with_chan(fun(Channel) ->
|
Channel = channel(#{conn_state => connected}),
|
||||||
{stop, {shutdown, normal}, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel),
|
{shutdown, normal, Channel1} = emqx_channel:handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel),
|
||||||
?assertEqual(undefined, emqx_channel:info(will_msg, Channel1))
|
?assertEqual(undefined, emqx_channel:info(will_msg, Channel1)).
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_in_auth(_) ->
|
t_handle_in_auth(_) ->
|
||||||
with_chan(fun(Channel) ->
|
Channel = channel(#{conn_state => connected}),
|
||||||
Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR),
|
Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR),
|
||||||
{stop, {shutdown, implementation_specific_error}, Packet, Channel} = handle_in(?AUTH_PACKET(), Channel)
|
{shutdown, implementation_specific_error, Packet, Channel}
|
||||||
end).
|
= emqx_channel:handle_in(?AUTH_PACKET(), Channel).
|
||||||
|
|
||||||
t_handle_in_frame_error(_) ->
|
t_handle_in_frame_error(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
IdleChannel = channel(#{conn_state => idle}),
|
||||||
|
{shutdown, frame_too_large, _}
|
||||||
|
= emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel),
|
||||||
|
ConnectingChan = channel(#{conn_state => connecting}),
|
||||||
|
{shutdown, frame_too_large, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), _}
|
||||||
|
= emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan),
|
||||||
|
ConnectedChan = channel(#{conn_state => connected}),
|
||||||
|
{shutdown, frame_too_large, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _}
|
||||||
|
= emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan),
|
||||||
|
DisconnectedChan = channel(#{conn_state => disconnected}),
|
||||||
|
{ok, DisconnectedChan}
|
||||||
|
= emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan).
|
||||||
|
|
||||||
t_handle_in_expected_packet(_) ->
|
t_handle_in_expected_packet(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
{ok, _Chan} = emqx_channel:handle_in(packet, channel()).
|
||||||
|
|
||||||
t_process_connect(_) ->
|
t_process_connect(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
ok = meck:expect(emqx_cm, open_session,
|
||||||
|
fun(true, _ClientInfo, _ConnInfo) ->
|
||||||
|
{ok, #{session => session(), present => false}}
|
||||||
|
end),
|
||||||
|
ConnPkt = connpkt(),
|
||||||
|
{ok, ?CONNACK_PACKET(?RC_SUCCESS), ConnPkt, _}
|
||||||
|
= emqx_channel:process_connect(ConnPkt, channel()).
|
||||||
|
|
||||||
t_handle_publish(_) ->
|
t_handle_publish(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>),
|
||||||
|
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _Channel}
|
||||||
|
= emqx_channel:handle_publish(Publish, channel()).
|
||||||
|
|
||||||
t_process_publish(_) ->
|
t_process_publish_qos1(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>),
|
||||||
|
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _Channel}
|
||||||
|
= emqx_channel:process_publish(1, Msg, channel()).
|
||||||
|
|
||||||
t_process_subscribe(_) ->
|
t_process_subscribe(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
|
||||||
|
{[?RC_SUCCESS], _Channel} = emqx_channel:process_subscribe(TopicFilters, channel()).
|
||||||
|
|
||||||
t_process_unsubscribe(_) ->
|
t_process_unsubscribe(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
ok = meck:expect(emqx_session, unsubscribe, fun(_, _, Session) -> {ok, Session} end),
|
||||||
|
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
|
||||||
|
{[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, channel()).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for handle_out
|
%% Test cases for handle_out
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_handle_out_delivers(_) ->
|
t_handle_out_delivers(_) ->
|
||||||
with_chan(fun(Channel) ->
|
ok = emqx_meck:expect(emqx_session, deliver,
|
||||||
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS#{qos => ?QOS_2}}],
|
fun(Delivers, Session) ->
|
||||||
{ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel1}
|
Msgs = [Msg || {deliver, _, Msg} <- Delivers],
|
||||||
= handle_in(?SUBSCRIBE_PACKET(1, #{}, TopicFilters), Channel),
|
[{publish, PacketId, Msg} || {PacketId, Msg} <- lists:zip(lists:seq(1, length(Msgs)), Msgs)]
|
||||||
Msg0 = emqx_message:make(<<"clientx">>, ?QOS_0, <<"t0">>, <<"qos0">>),
|
end),
|
||||||
Msg1 = emqx_message:make(<<"clientx">>, ?QOS_1, <<"t1">>, <<"qos1">>),
|
Msg0 = emqx_message:make(test, ?QOS_1, <<"t1">>, <<"qos1">>),
|
||||||
Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
|
Msg1 = emqx_message:make(test, ?QOS_2, <<"t2">>, <<"qos2">>),
|
||||||
{ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, Channel1),
|
Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
|
||||||
?assertEqual([?QOS_0, ?QOS_1], [emqx_packet:qos(Pkt)|| Pkt <- Packets])
|
{ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, channel()),
|
||||||
end).
|
?assertEqual([?QOS_1, ?QOS_2], [emqx_packet:qos(Pkt)|| Pkt <- Packets]).
|
||||||
|
|
||||||
t_handle_out_connack_sucess(_) ->
|
t_handle_out_publishes(_) ->
|
||||||
ConnPkt = #mqtt_packet_connect{
|
Channel = channel(#{conn_state => connected}),
|
||||||
proto_name = <<"MQTT">>,
|
Pub0 = {publish, undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
|
||||||
proto_ver = ?MQTT_PROTO_V4,
|
Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
|
||||||
clean_start = true,
|
{ok, {outgoing, Packets}, NChannel}
|
||||||
properties = #{},
|
= emqx_channel:handle_out({publish, [Pub0, Pub1]}, Channel),
|
||||||
clientid = <<"clientid">>
|
?assertEqual(2, length(Packets)),
|
||||||
},
|
?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, NChannel)).
|
||||||
with_chan(fun(Channel) ->
|
|
||||||
{ok, [{enter, connected},{outgoing, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}], _Chan}
|
|
||||||
= handle_out({connack, ?RC_SUCCESS, 0, ConnPkt}, Channel),
|
|
||||||
{stop, {shutdown, not_authorized}, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _}
|
|
||||||
= handle_out({connack, ?RC_NOT_AUTHORIZED, ConnPkt}, Channel)
|
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_out_connack_failure(_) ->
|
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
|
||||||
|
|
||||||
t_handle_out_publish(_) ->
|
t_handle_out_publish(_) ->
|
||||||
with_chan(fun(Channel) ->
|
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>),
|
||||||
Pub0 = {publish, undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
|
{ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, <<"payload">>), _Channel}
|
||||||
Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
|
= emqx_channel:handle_out({publish, 1, Msg}, channel()).
|
||||||
{ok, ?PUBLISH_PACKET(?QOS_0), Channel} = handle_out(Pub0, Channel),
|
|
||||||
{ok, ?PUBLISH_PACKET(?QOS_1), Channel} = handle_out(Pub1, Channel),
|
t_handle_out_publish_nl(_) ->
|
||||||
{ok, {outgoing, Packets}, Channel1} = handle_out({publish, [Pub0, Pub1]}, Channel),
|
ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
|
||||||
?assertEqual(2, length(Packets)),
|
Channel = channel(#{clientinfo => ClientInfo}),
|
||||||
?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, Channel1))
|
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>),
|
||||||
end).
|
Publish = {publish, 1, emqx_message:set_flag(nl, Msg)},
|
||||||
|
{ok, Channel} = emqx_channel:handle_out(Publish, Channel).
|
||||||
|
|
||||||
|
t_handle_out_connack_sucess(_) ->
|
||||||
|
Channel = channel(#{conn_state => connected}),
|
||||||
|
{ok, {connack, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}, _Chan}
|
||||||
|
= emqx_channel:handle_out({connack, ?RC_SUCCESS, 0, connpkt()}, Channel).
|
||||||
|
|
||||||
|
t_handle_out_connack_failure(_) ->
|
||||||
|
{shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan}
|
||||||
|
= emqx_channel:handle_out({connack, ?RC_NOT_AUTHORIZED, connpkt()}, channel()).
|
||||||
|
|
||||||
t_handle_out_puback(_) ->
|
t_handle_out_puback(_) ->
|
||||||
with_chan(fun(Channel) ->
|
Channel = channel(#{conn_state => connected}),
|
||||||
{ok, Channel} = handle_out({puberr, ?RC_NOT_AUTHORIZED}, Channel),
|
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), NChannel}
|
||||||
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Channel1}
|
= emqx_channel:handle_out(puback, {1, ?RC_SUCCESS}, Channel),
|
||||||
= handle_out({puback, 1, ?RC_SUCCESS}, Channel),
|
?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
||||||
?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, Channel1))
|
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_out_pubrec(_) ->
|
t_handle_out_pubrec(_) ->
|
||||||
with_chan(fun(Channel) ->
|
Channel = channel(#{conn_state => connected}),
|
||||||
{ok, ?PUBREC_PACKET(4, ?RC_SUCCESS), Channel1}
|
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), NChannel}
|
||||||
= handle_out({pubrec, 4, ?RC_SUCCESS}, Channel),
|
= emqx_channel:handle_out({pubrec, 1, ?RC_SUCCESS}, Channel),
|
||||||
?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, Channel1))
|
?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_out_pubrel(_) ->
|
t_handle_out_pubrel(_) ->
|
||||||
with_chan(fun(Channel) ->
|
Channel = channel(#{conn_state => connected}),
|
||||||
{ok, ?PUBREL_PACKET(2), Channel1}
|
{ok, ?PUBREL_PACKET(1), Channel1}
|
||||||
= handle_out({pubrel, 2, ?RC_SUCCESS}, Channel),
|
= emqx_channel:handle_out({pubrel, 1, ?RC_SUCCESS}, Channel),
|
||||||
{ok, ?PUBREL_PACKET(3, ?RC_SUCCESS), Channel2}
|
{ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), Channel2}
|
||||||
= handle_out({pubrel, 3, ?RC_SUCCESS}, Channel1),
|
= emqx_channel:handle_out({pubrel, 2, ?RC_SUCCESS}, Channel1),
|
||||||
?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2))
|
?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2)).
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_out_pubcomp(_) ->
|
t_handle_out_pubcomp(_) ->
|
||||||
with_chan(fun(Channel) ->
|
{ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel}
|
||||||
{ok, ?PUBCOMP_PACKET(5, ?RC_SUCCESS), Channel1}
|
= emqx_channel:handle_out({pubcomp, 2, ?RC_SUCCESS}, channel()),
|
||||||
= handle_out({pubcomp, 5, ?RC_SUCCESS}, Channel),
|
?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel)).
|
||||||
?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel1))
|
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_out_suback(_) ->
|
t_handle_out_suback(_) ->
|
||||||
with_chan(fun(Channel) ->
|
{ok, ?SUBACK_PACKET(1, [?QOS_2]), _Channel}
|
||||||
{ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel}
|
= emqx_channel:handle_out({suback, 1, [?QOS_2]}, channel()).
|
||||||
= handle_out({suback, 1, [?QOS_2]}, Channel)
|
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_out_unsuback(_) ->
|
t_handle_out_unsuback(_) ->
|
||||||
with_chan(fun(Channel) ->
|
{ok, ?UNSUBACK_PACKET(1, [?RC_SUCCESS]), _Channel}
|
||||||
{ok, ?UNSUBACK_PACKET(1), Channel}
|
= emqx_channel:handle_out({unsuback, 1, [?RC_SUCCESS]}, channel()).
|
||||||
= handle_out({unsuback, 1, [?RC_SUCCESS]}, Channel)
|
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_out_disconnect(_) ->
|
t_handle_out_disconnect(_) ->
|
||||||
with_chan(
|
{shutdown, normal, ?DISCONNECT_PACKET(?RC_SUCCESS), _Chan}
|
||||||
fun(Channel) ->
|
= emqx_channel:handle_out({disconnect, ?RC_SUCCESS}, channel()).
|
||||||
handle_out({disconnect, ?RC_SUCCESS}, Channel)
|
|
||||||
end).
|
|
||||||
|
|
||||||
t_handle_out_unexpected(_) ->
|
t_handle_out_unexpected(_) ->
|
||||||
with_chan(fun(Channel) ->
|
{ok, _Channel} = emqx_channel:handle_out(unexpected, <<"data">>, channel()).
|
||||||
handle_out({disconnect, ?RC_SUCCESS}, Channel)
|
|
||||||
end).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for handle_call
|
%% Test cases for handle_call
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_handle_call_kick(_) ->
|
t_handle_call_kick(_) ->
|
||||||
error('TODO').
|
{shutdown, kicked, ok, _Chan} = emqx_channel:handle_call(kick, channel()).
|
||||||
|
|
||||||
t_handle_call_discard(_) ->
|
t_handle_call_discard(_) ->
|
||||||
error('TODO').
|
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
|
||||||
|
{shutdown, discarded, ok, Packet, _Channel}
|
||||||
|
= emqx_channel:handle_call(discard, channel()).
|
||||||
|
|
||||||
t_handle_call_takeover(_) ->
|
t_handle_call_takeover_begin(_) ->
|
||||||
error('TODO').
|
{reply, undefined, _Channel}
|
||||||
|
= emqx_channel:handle_call({takeover, 'begin'}, channel()).
|
||||||
|
|
||||||
|
t_handle_call_takeover_end(_) ->
|
||||||
|
ok = meck:expect(emqx_session, takeover, fun(_) -> ok end),
|
||||||
|
{shutdown, takeovered, [], _Channel}
|
||||||
|
= emqx_channel:handle_call({takeover, 'end'}, channel()).
|
||||||
|
|
||||||
t_handle_call_unexpected(_) ->
|
t_handle_call_unexpected(_) ->
|
||||||
error('TODO').
|
{reply, ignored, _Channel} = emqx_channel:handle_call(unexpected_req, channel()).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for handle_info
|
%% Test cases for handle_info
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_handle_info_subscribe(_) ->
|
t_handle_info_subscribe(_) ->
|
||||||
error('TODO').
|
ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
|
||||||
|
{ok, _Chan} = emqx_channel:handle_info({subscribe, topic_filters()}, channel()).
|
||||||
|
|
||||||
t_handle_info_unsubscribe(_) ->
|
t_handle_info_unsubscribe(_) ->
|
||||||
error('TODO').
|
ok = meck:expect(emqx_session, unsubscribe, fun(_, _, Session) -> {ok, Session} end),
|
||||||
|
{ok, _Chan} = emqx_channel:handle_info({unsubscribe, topic_filters()}, channel()).
|
||||||
|
|
||||||
t_handle_info_sock_closed(_) ->
|
t_handle_info_sock_closed(_) ->
|
||||||
error('TODO').
|
{ok, _Chan} = emqx_channel:handle_out({sock_closed, reason}, channel(#{conn_state => disconnected})).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for handle_timeout
|
%% Test cases for handle_timeout
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_handle_timeout_emit_stats(_) ->
|
t_handle_timeout_emit_stats(_) ->
|
||||||
with_chan(fun(Channel) -> 'TODO' end).
|
{ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {emit_stats, []}, channel()).
|
||||||
|
|
||||||
t_handle_timeout_keepalive(_) ->
|
t_handle_timeout_keepalive(_) ->
|
||||||
with_chan(fun(Channel) -> 'TODO' end).
|
{ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {keepalive, 10}, channel()).
|
||||||
|
|
||||||
t_handle_timeout_retry_delivery(_) ->
|
t_handle_timeout_retry_delivery(_) ->
|
||||||
with_chan(fun(Channel) -> 'TODO' end).
|
{ok, _Chan} = emqx_channel:handle_timeout(make_ref(), retry_delivery, channel()).
|
||||||
|
|
||||||
t_handle_timeout_expire_awaiting_rel(_) ->
|
t_handle_timeout_expire_awaiting_rel(_) ->
|
||||||
with_chan(fun(Channel) -> 'TODO' end).
|
{ok, _Chan} = emqx_channel:handle_timeout(make_ref(), expire_awaiting_rel, channel()).
|
||||||
|
|
||||||
t_handle_timeout_expire_session(_) ->
|
t_handle_timeout_expire_session(_) ->
|
||||||
with_chan(fun(Channel) -> 'TODO' end).
|
{shutdown, expired, _Chan} = emqx_channel:handle_timeout(make_ref(), expire_awaiting_rel, channel()).
|
||||||
|
|
||||||
t_handle_timeout_will_message(_) ->
|
t_handle_timeout_will_message(_) ->
|
||||||
with_chan(fun(Channel) -> 'TODO' end).
|
{ok, _Chan} = emqx_channel:handle_timeout(make_ref(), will_message, channel()).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Test cases for ensure_timer
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
t_ensure_timer(_) ->
|
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
|
||||||
|
|
||||||
t_reset_timer(_) ->
|
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
|
||||||
|
|
||||||
t_alive_timer(_) ->
|
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
|
||||||
|
|
||||||
t_retry_timer(_) ->
|
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
|
||||||
|
|
||||||
t_await_timer(_) ->
|
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
|
||||||
|
|
||||||
t_expire_timer(_) ->
|
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
|
||||||
|
|
||||||
t_will_timer(_) ->
|
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for internal functions
|
%% Test cases for internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_enrich_conninfo(_) ->
|
t_enrich_conninfo(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
{ok, _Chan} = emqx_channel:enrich_conninfo(connpkt(), channel()).
|
||||||
|
|
||||||
t_enrich_client(_) ->
|
t_enrich_client(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
{ok, _ConnPkt, _Chan} = emqx_channel:enrich_client(connpkt(), channel()).
|
||||||
|
|
||||||
t_check_banned(_) ->
|
t_check_banned(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
ok = emqx_channel:check_banned(connpkt(), channel()).
|
||||||
|
|
||||||
t_check_flapping(_) ->
|
t_check_flapping(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
ok = emqx_channel:check_flapping(connpkt(), channel()).
|
||||||
|
|
||||||
t_auth_connect(_) ->
|
t_auth_connect(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {ok, #{}} end),
|
||||||
|
{ok, _Chan} = emqx_channel:auth_connect(connpkt(), channel()).
|
||||||
|
|
||||||
t_process_alias(_) ->
|
t_process_alias(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
|
||||||
|
Channel = emqx_channel:set_field(topic_aliases, #{1 => <<"t">>}, channel()),
|
||||||
|
{ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan}
|
||||||
|
= emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel).
|
||||||
|
|
||||||
t_check_pub_acl(_) ->
|
t_check_pub_acl(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
|
||||||
|
ok = emqx_channel:check_pub_acl(Publish, channel()).
|
||||||
|
|
||||||
t_check_pub_alias(_) ->
|
t_check_pub_alias(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
|
||||||
|
ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, channel()).
|
||||||
|
|
||||||
t_check_subscribe(_) ->
|
t_check_subscribe(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
ok = emqx_channel:check_subscribe(<<"t">>, ?DEFAULT_SUBOPTS, channel()).
|
||||||
|
|
||||||
t_check_sub_acl(_) ->
|
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
|
||||||
|
|
||||||
t_check_sub_caps(_) ->
|
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
|
||||||
|
|
||||||
t_enrich_subid(_) ->
|
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
|
||||||
|
|
||||||
t_enrich_subopts(_) ->
|
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
|
||||||
|
|
||||||
t_enrich_caps(_) ->
|
t_enrich_caps(_) ->
|
||||||
with_chan(fun(Channel) -> error('TODO') end).
|
ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
|
||||||
|
ok = meck:expect(emqx_mqtt_caps, get_caps,
|
||||||
|
fun(_Zone) ->
|
||||||
|
#{max_packet_size => 1024,
|
||||||
|
max_qos_allowed => ?QOS_2,
|
||||||
|
retain_available => true,
|
||||||
|
max_topic_alias => 10,
|
||||||
|
shared_subscription => true,
|
||||||
|
wildcard_subscription => true
|
||||||
|
}
|
||||||
|
end),
|
||||||
|
AckProps = emqx_channel:enrich_caps(#{}, channel()),
|
||||||
|
?assertMatch(#{'Retain-Available' := 1,
|
||||||
|
'Maximum-Packet-Size' := 1024,
|
||||||
|
'Topic-Alias-Maximum' := 10,
|
||||||
|
'Wildcard-Subscription-Available' := 1,
|
||||||
|
'Subscription-Identifier-Available' := 1,
|
||||||
|
'Shared-Subscription-Available' := 1,
|
||||||
|
'Maximum-QoS' := ?QOS_2
|
||||||
|
}, AckProps),
|
||||||
|
ok = meck:unload(emqx_mqtt_caps).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for terminate
|
%% Test cases for terminate
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_terminate(_) ->
|
t_terminate(_) ->
|
||||||
with_chan(
|
ok = emqx_channel:terminate(normal, channel()),
|
||||||
fun(Channel) ->
|
ok = emqx_channel:terminate(sock_error, channel(#{conn_state => connected})),
|
||||||
'TODO'
|
ok = emqx_channel:terminate({shutdown, kicked}, channel(#{conn_state => connected})).
|
||||||
end).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
with_connected_channel(TestFun) ->
|
channel() -> channel(#{}).
|
||||||
with_chan(
|
channel(InitFields) ->
|
||||||
fun(Channel) ->
|
maps:fold(fun(Field, Value, Channel) ->
|
||||||
TestFun(emqx_channel:set_field(conn_state, connected, Channel))
|
emqx_channel:set_field(Field, Value, Channel)
|
||||||
end).
|
end, default_channel(), InitFields).
|
||||||
|
|
||||||
with_chan(TestFun) ->
|
default_channel() ->
|
||||||
with_chan(#{}, TestFun).
|
Channel = emqx_channel:init(?DEFAULT_CONNINFO, [{zone, zone}]),
|
||||||
|
Channel1 = emqx_channel:set_field(conn_state, connected, Channel),
|
||||||
|
emqx_channel:set_field(clientinfo, clientinfo(), Channel1).
|
||||||
|
|
||||||
with_chan(ConnInfo, TestFun) ->
|
clientinfo() -> clientinfo(#{}).
|
||||||
ConnInfo1 = maps:merge(?DEFAULT_CONNINFO, ConnInfo),
|
clientinfo(InitProps) ->
|
||||||
ClientInfo = #{zone => <<"external">>,
|
maps:merge(#{zone => zone,
|
||||||
protocol => mqtt,
|
protocol => mqtt,
|
||||||
peerhost => {127,0,0,1},
|
peerhost => {127,0,0,1},
|
||||||
clientid => <<"clientid">>,
|
clientid => <<"clientid">>,
|
||||||
username => <<"username">>,
|
username => <<"username">>,
|
||||||
peercert => undefined,
|
is_superuser => false,
|
||||||
is_bridge => false,
|
peercert => undefined,
|
||||||
is_superuser => false,
|
mountpoint => undefined
|
||||||
mountpoint => undefined
|
}, InitProps).
|
||||||
},
|
|
||||||
Channel = emqx_channel:init(ConnInfo1, [{zone, testing}]),
|
topic_filters() ->
|
||||||
Session = emqx_session:init(ClientInfo, ConnInfo1),
|
[{<<"+">>, ?DEFAULT_SUBOPTS}, {<<"#">>, ?DEFAULT_SUBOPTS}].
|
||||||
Channel1 = emqx_channel:set_field(clientinfo, ClientInfo, Channel),
|
|
||||||
TestFun(emqx_channel:set_field(session, Session, Channel1)).
|
connpkt() ->
|
||||||
|
#mqtt_packet_connect{
|
||||||
|
proto_name = <<"MQTT">>,
|
||||||
|
proto_ver = ?MQTT_PROTO_V4,
|
||||||
|
is_bridge = false,
|
||||||
|
clean_start = true,
|
||||||
|
keepalive = 30,
|
||||||
|
properties = undefined,
|
||||||
|
clientid = <<"clientid">>,
|
||||||
|
username = <<"username">>,
|
||||||
|
password = <<"passwd">>
|
||||||
|
}.
|
||||||
|
|
||||||
|
session() -> session(#{}).
|
||||||
|
session(InitFields) when is_map(InitFields) ->
|
||||||
|
maps:fold(fun(Field, Value, Session) ->
|
||||||
|
emqx_session:set_field(Field, Value, Session)
|
||||||
|
end,
|
||||||
|
emqx_session:init(#{zone => zone}, #{receive_maximum => 0}),
|
||||||
|
InitFields).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue