Add more test cases for connection, channel and session modules

This commit is contained in:
Feng Lee 2019-10-18 18:53:31 +08:00
parent a0e72fd040
commit cd4adbada0
9 changed files with 1037 additions and 660 deletions

View File

@ -486,3 +486,4 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -24,6 +24,11 @@
-logger_header("[Channel]"). -logger_header("[Channel]").
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-export([ info/1 -export([ info/1
, info/2 , info/2
, attrs/1 , attrs/1
@ -31,9 +36,6 @@
, caps/1 , caps/1
]). ]).
%% Test Exports
-export([set_field/3]).
-export([ init/2 -export([ init/2
, handle_in/2 , handle_in/2
, handle_out/2 , handle_out/2
@ -43,6 +45,13 @@
, terminate/2 , terminate/2
]). ]).
-export([ recvd/2
, sent/2
]).
%% export for ct
-export([set_field/3]).
-import(emqx_misc, -import(emqx_misc,
[ run_fold/3 [ run_fold/3
, pipeline/3 , pipeline/3
@ -219,17 +228,17 @@ init_gc_state(Zone) ->
%% Handle incoming packet %% Handle incoming packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(handle_in(Bytes :: pos_integer() | emqx_types:packet(), channel()) -spec(recvd(pos_integer(), channel()) -> channel()).
recvd(Bytes, Channel) ->
ensure_timer(stats_timer, maybe_gc_and_check_oom(Bytes, Channel)).
-spec(handle_in(emqx_types:packet(), channel())
-> {ok, channel()} -> {ok, channel()}
| {ok, output(), channel()} | {ok, output(), channel()}
| {stop, Reason :: term(), channel()} | {shutdown, Reason :: term(), channel()}
| {stop, Reason :: term(), output(), channel()}). | {shutdown, Reason :: term(), output(), channel()}).
handle_in(Bytes, Channel) when is_integer(Bytes) ->
NChannel = maybe_gc_and_check_oom(Bytes, Channel),
{ok, ensure_timer(stats_timer, NChannel)};
handle_in(?CONNECT_PACKET(_), Channel = #channel{conn_state = connected}) -> handle_in(?CONNECT_PACKET(_), Channel = #channel{conn_state = connected}) ->
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
case pipeline([fun enrich_conninfo/2, case pipeline([fun enrich_conninfo/2,
@ -243,7 +252,7 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
process_connect(NConnPkt, NChannel); process_connect(NConnPkt, NChannel);
{error, ReasonCode, NChannel} -> {error, ReasonCode, NChannel} ->
ReasonName = emqx_reason_codes:formalized(connack, ReasonCode), ReasonName = emqx_reason_codes:formalized(connack, ReasonCode),
handle_out({connack, ReasonName, ConnPkt}, NChannel) handle_out(connack, {ReasonName, ConnPkt}, NChannel)
end; end;
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
@ -251,7 +260,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
case emqx_packet:check(Packet) of case emqx_packet:check(Packet) of
ok -> handle_publish(Packet, NChannel); ok -> handle_publish(Packet, NChannel);
{error, ReasonCode} -> {error, ReasonCode} ->
handle_out({disconnect, ReasonCode}, NChannel) handle_out(disconnect, ReasonCode, NChannel)
end; end;
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
@ -281,26 +290,27 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
{ok, Msg, NSession} -> {ok, Msg, NSession} ->
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
NChannel = Channel1#channel{session = NSession}, NChannel = Channel1#channel{session = NSession},
handle_out({pubrel, PacketId, ?RC_SUCCESS}, NChannel); handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]), ?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]),
ok = emqx_metrics:inc('packets.pubrec.inuse'), ok = emqx_metrics:inc('packets.pubrec.inuse'),
handle_out({pubrel, PacketId, RC}, Channel1); handle_out(pubrel, {PacketId, RC}, Channel1);
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBREC ~w is not found.", [PacketId]), ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]),
ok = emqx_metrics:inc('packets.pubrec.missed'), ok = emqx_metrics:inc('packets.pubrec.missed'),
handle_out({pubrel, PacketId, RC}, Channel1) handle_out(pubrel, {PacketId, RC}, Channel1)
end; end;
handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
Channel1 = inc_pub_stats(pubrel_in, Channel), Channel1 = inc_pub_stats(pubrel_in, Channel),
case emqx_session:pubrel(PacketId, Session) of case emqx_session:pubrel(PacketId, Session) of
{ok, NSession} -> {ok, NSession} ->
handle_out({pubcomp, PacketId, ?RC_SUCCESS}, Channel1#channel{session = NSession}); Channel2 = Channel1#channel{session = NSession},
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, Channel2);
{error, NotFound} -> {error, NotFound} ->
ok = emqx_metrics:inc('packets.pubrel.missed'), ok = emqx_metrics:inc('packets.pubrel.missed'),
?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]), ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]),
handle_out({pubcomp, PacketId, NotFound}, Channel1) handle_out(pubcomp, {PacketId, NotFound}, Channel1)
end; end;
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
@ -324,9 +334,9 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
parse_topic_filters(TopicFilters)), parse_topic_filters(TopicFilters)),
TopicFilters2 = enrich_subid(Properties, TopicFilters1), TopicFilters2 = enrich_subid(Properties, TopicFilters1),
{ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel), {ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel),
handle_out({suback, PacketId, ReasonCodes}, NChannel); handle_out(suback, {PacketId, ReasonCodes}, NChannel);
{error, ReasonCode} -> {error, ReasonCode} ->
handle_out({disconnect, ReasonCode}, Channel) handle_out(disconnect, ReasonCode, Channel)
end; end;
handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
@ -336,9 +346,9 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
[ClientInfo, Properties], [ClientInfo, Properties],
parse_topic_filters(TopicFilters)), parse_topic_filters(TopicFilters)),
{ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
handle_out({unsuback, PacketId, ReasonCodes}, NChannel); handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
{error, ReasonCode} -> {error, ReasonCode} ->
handle_out({disconnect, ReasonCode}, Channel) handle_out(disconnect, ReasonCode, Channel)
end; end;
handle_in(?PACKET(?PINGREQ), Channel) -> handle_in(?PACKET(?PINGREQ), Channel) ->
@ -355,7 +365,7 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf
Interval = emqx_mqtt_props:get('Session-Expiry-Interval', Properties, OldInterval), Interval = emqx_mqtt_props:get('Session-Expiry-Interval', Properties, OldInterval),
if if
OldInterval == 0 andalso Interval > OldInterval -> OldInterval == 0 andalso Interval > OldInterval ->
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel1); handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel1);
Interval == 0 -> Interval == 0 ->
shutdown(ReasonName, Channel1); shutdown(ReasonName, Channel1);
true -> true ->
@ -364,7 +374,7 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf
end; end;
handle_in(?AUTH_PACKET(), Channel) -> handle_in(?AUTH_PACKET(), Channel) ->
handle_out({disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}, Channel); handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
shutdown(Reason, Channel); shutdown(Reason, Channel);
@ -373,7 +383,7 @@ handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(Reason, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel); shutdown(Reason, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
handle_in({frame_error, _Reason}, Channel = #channel{conn_state = connected}) -> handle_in({frame_error, _Reason}, Channel = #channel{conn_state = connected}) ->
handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel); handle_out(disconnect, ?RC_MALFORMED_PACKET, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) -> handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
?LOG(error, "Unexpected frame error: ~p", [Reason]), ?LOG(error, "Unexpected frame error: ~p", [Reason]),
@ -381,7 +391,7 @@ handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected})
handle_in(Packet, Channel) -> handle_in(Packet, Channel) ->
?LOG(error, "Unexpected incoming: ~p", [Packet]), ?LOG(error, "Unexpected incoming: ~p", [Packet]),
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel). handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process Connect %% Process Connect
@ -392,18 +402,18 @@ process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
{ok, #{session := Session, present := false}} -> {ok, #{session := Session, present := false}} ->
NChannel = Channel#channel{session = Session}, NChannel = Channel#channel{session = Session},
handle_out({connack, ?RC_SUCCESS, sp(false), ConnPkt}, NChannel); handle_out(connack, {?RC_SUCCESS, sp(false), ConnPkt}, NChannel);
{ok, #{session := Session, present := true, pendings := Pendings}} -> {ok, #{session := Session, present := true, pendings := Pendings}} ->
%%TODO: improve later. %%TODO: improve later.
NPendings = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())), NPendings = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
NChannel = Channel#channel{session = Session, NChannel = Channel#channel{session = Session,
resuming = true, resuming = true,
pendings = NPendings}, pendings = NPendings},
handle_out({connack, ?RC_SUCCESS, sp(true), ConnPkt}, NChannel); handle_out(connack, {?RC_SUCCESS, sp(true), ConnPkt}, NChannel);
{error, Reason} -> {error, Reason} ->
%% TODO: Unknown error? %% TODO: Unknown error?
?LOG(error, "Failed to open session: ~p", [Reason]), ?LOG(error, "Failed to open session: ~p", [Reason]),
handle_out({connack, ?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel) handle_out(connack, {?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel)
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -426,7 +436,7 @@ handle_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId),
{error, ReasonCode, NChannel} -> {error, ReasonCode, NChannel} ->
?LOG(warning, "Cannot publish message to ~s due to ~s", ?LOG(warning, "Cannot publish message to ~s due to ~s",
[Topic, emqx_reason_codes:text(ReasonCode, ProtoVer)]), [Topic, emqx_reason_codes:text(ReasonCode, ProtoVer)]),
handle_out({disconnect, ReasonCode}, NChannel) handle_out(disconnect, ReasonCode, NChannel)
end. end.
process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), Channel) -> process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), Channel) ->
@ -442,7 +452,7 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
[] -> ?RC_NO_MATCHING_SUBSCRIBERS; [] -> ?RC_NO_MATCHING_SUBSCRIBERS;
_ -> ?RC_SUCCESS _ -> ?RC_SUCCESS
end, end,
handle_out({puback, PacketId, ReasonCode}, Channel); handle_out(puback, {PacketId, ReasonCode}, Channel);
process_publish(PacketId, Msg = #message{qos = ?QOS_2}, process_publish(PacketId, Msg = #message{qos = ?QOS_2},
Channel = #channel{session = Session}) -> Channel = #channel{session = Session}) ->
@ -453,14 +463,14 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_2},
_ -> ?RC_SUCCESS _ -> ?RC_SUCCESS
end, end,
NChannel = Channel#channel{session = NSession}, NChannel = Channel#channel{session = NSession},
handle_out({pubrec, PacketId, RC}, ensure_timer(await_timer, NChannel)); handle_out(pubrec, {PacketId, RC}, ensure_timer(await_timer, NChannel));
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
ok = emqx_metrics:inc('packets.publish.inuse'), ok = emqx_metrics:inc('packets.publish.inuse'),
handle_out({pubrec, PacketId, RC}, Channel); handle_out(pubrec, {PacketId, RC}, Channel);
{error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
?LOG(warning, "Dropped qos2 packet ~w due to awaiting_rel is full", [PacketId]), ?LOG(warning, "Dropped qos2 packet ~w due to awaiting_rel is full", [PacketId]),
ok = emqx_metrics:inc('messages.qos2.dropped'), ok = emqx_metrics:inc('messages.qos2.dropped'),
handle_out({pubrec, PacketId, RC}, Channel) handle_out(pubrec, {PacketId, RC}, Channel)
end. end.
publish_to_msg(Packet, #channel{conninfo = #{proto_ver := ProtoVer}, publish_to_msg(Packet, #channel{conninfo = #{proto_ver := ProtoVer},
@ -528,15 +538,15 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
%% Handle outgoing packet %% Handle outgoing packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(handle_out(integer()|term(), channel()) -spec(sent(pos_integer(), channel()) -> channel()).
sent(Bytes, Channel) ->
ensure_timer(stats_timer, maybe_gc_and_check_oom(Bytes, Channel)).
-spec(handle_out(term(), channel())
-> {ok, channel()} -> {ok, channel()}
| {ok, output(), channel()} | {ok, output(), channel()}
| {stop, Reason :: term(), channel()} | {shutdown, Reason :: term(), channel()}
| {stop, Reason :: term(), output(), channel()}). | {shutdown, Reason :: term(), output(), channel()}).
handle_out(Bytes, Channel) when is_integer(Bytes) ->
NChannel = maybe_gc_and_check_oom(Bytes, Channel),
{ok, ensure_timer(stats_timer, NChannel)};
handle_out(Delivers, Channel = #channel{conn_state = disconnected, handle_out(Delivers, Channel = #channel{conn_state = disconnected,
session = Session}) session = Session})
when is_list(Delivers) -> when is_list(Delivers) ->
@ -557,42 +567,6 @@ handle_out(Delivers, Channel = #channel{session = Session}) when is_list(Deliver
{ok, Channel#channel{session = NSession}} {ok, Channel#channel{session = NSession}}
end; end;
handle_out({connack, ?RC_SUCCESS, SP, ConnPkt},
Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
AckProps = run_fold([fun enrich_caps/2,
fun enrich_server_keepalive/2,
fun enrich_assigned_clientid/2], #{}, Channel),
ConnInfo1 = ConnInfo#{connected_at => erlang:system_time(second)},
Channel1 = Channel#channel{conninfo = ConnInfo1,
will_msg = emqx_packet:will_msg(ConnPkt),
conn_state = connected,
alias_maximum = init_alias_maximum(ConnPkt, ClientInfo)
},
Channel2 = ensure_keepalive(AckProps, Channel1),
ok = emqx_hooks:run('client.connected', [ClientInfo, ?RC_SUCCESS, ConnInfo]),
AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
case maybe_resume_session(Channel2) of
ignore ->
{ok, [{enter, connected}, {outgoing, AckPacket}], Channel2};
{ok, Publishes, NSession} ->
Channel3 = Channel2#channel{session = NSession,
resuming = false,
pendings = []},
{ok, {outgoing, Packets}, _} = handle_out({publish, Publishes}, Channel3),
{ok, [{enter, connected}, {outgoing, [AckPacket|Packets]}], Channel3}
end;
handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
ReasonCode1 = case ProtoVer = maps:get(proto_ver, ConnInfo) of
?MQTT_PROTO_V5 -> ReasonCode;
_Ver -> emqx_reason_codes:compat(connack, ReasonCode)
end,
Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
shutdown(Reason, ?CONNACK_PACKET(ReasonCode1), Channel);
handle_out({publish, Publishes}, Channel) when is_list(Publishes) -> handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
Packets = lists:foldl( Packets = lists:foldl(
fun(Publish, Acc) -> fun(Publish, Acc) ->
@ -618,40 +592,80 @@ handle_out({publish, PacketId, Msg}, Channel =
Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2), Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2),
{ok, emqx_message:to_packet(PacketId, Msg3), Channel}; {ok, emqx_message:to_packet(PacketId, Msg3), Channel};
handle_out({puback, PacketId, ReasonCode}, Channel) -> handle_out(Data, Channel) ->
?LOG(error, "Unexpected outgoing: ~p", [Data]),
{ok, Channel}.
handle_out(connack, {?RC_SUCCESS, SP, ConnPkt},
Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
AckProps = run_fold([fun enrich_caps/2,
fun enrich_server_keepalive/2,
fun enrich_assigned_clientid/2], #{}, Channel),
ConnInfo1 = ConnInfo#{connected_at => erlang:system_time(second)},
Channel1 = Channel#channel{conninfo = ConnInfo1,
will_msg = emqx_packet:will_msg(ConnPkt),
conn_state = connected,
alias_maximum = init_alias_maximum(ConnPkt, ClientInfo)
},
Channel2 = ensure_keepalive(AckProps, Channel1),
ok = emqx_hooks:run('client.connected', [ClientInfo, ?RC_SUCCESS, ConnInfo]),
AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
case maybe_resume_session(Channel2) of
ignore ->
{ok, [{enter, connected}, {outgoing, AckPacket}], Channel2};
{ok, Publishes, NSession} ->
Channel3 = Channel2#channel{session = NSession,
resuming = false,
pendings = []},
{ok, {outgoing, Packets}, _} = handle_out({publish, Publishes}, Channel3),
{ok, [{enter, connected}, {outgoing, [AckPacket|Packets]}], Channel3}
end;
handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
ReasonCode1 = case ProtoVer = maps:get(proto_ver, ConnInfo) of
?MQTT_PROTO_V5 -> ReasonCode;
_Ver -> emqx_reason_codes:compat(connack, ReasonCode)
end,
Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
shutdown(Reason, ?CONNACK_PACKET(ReasonCode1), Channel);
handle_out(puback, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBACK_PACKET(PacketId, ReasonCode), inc_pub_stats(puback_out, Channel)}; {ok, ?PUBACK_PACKET(PacketId, ReasonCode), inc_pub_stats(puback_out, Channel)};
handle_out({pubrel, PacketId, ReasonCode}, Channel) -> handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBREL_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrel_out, Channel)}; {ok, ?PUBREL_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrel_out, Channel)};
handle_out({pubrec, PacketId, ReasonCode}, Channel) -> handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBREC_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrec_out, Channel)}; {ok, ?PUBREC_PACKET(PacketId, ReasonCode), inc_pub_stats(pubrec_out, Channel)};
handle_out({pubcomp, PacketId, ReasonCode}, Channel) -> handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), inc_pub_stats(pubcomp_out, Channel)}; {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), inc_pub_stats(pubcomp_out, Channel)};
handle_out({suback, PacketId, ReasonCodes}, handle_out(suback, {PacketId, ReasonCodes},
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) -> Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
{ok, ?SUBACK_PACKET(PacketId, ReasonCodes), Channel}; {ok, ?SUBACK_PACKET(PacketId, ReasonCodes), Channel};
handle_out({suback, PacketId, ReasonCodes}, Channel) -> handle_out(suback, {PacketId, ReasonCodes}, Channel) ->
ReasonCodes1 = [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes], ReasonCodes1 = [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes],
{ok, ?SUBACK_PACKET(PacketId, ReasonCodes1), Channel}; {ok, ?SUBACK_PACKET(PacketId, ReasonCodes1), Channel};
handle_out({unsuback, PacketId, ReasonCodes}, handle_out(unsuback, {PacketId, ReasonCodes},
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) -> Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
{ok, ?UNSUBACK_PACKET(PacketId, ReasonCodes), Channel}; {ok, ?UNSUBACK_PACKET(PacketId, ReasonCodes), Channel};
handle_out({unsuback, PacketId, _ReasonCodes}, Channel) -> handle_out(unsuback, {PacketId, _ReasonCodes}, Channel) ->
{ok, ?UNSUBACK_PACKET(PacketId), Channel}; {ok, ?UNSUBACK_PACKET(PacketId), Channel};
handle_out({disconnect, ReasonCode}, Channel = #channel{conninfo = #{proto_ver := ProtoVer}}) -> handle_out(disconnect, ReasonCode, Channel = #channel{conninfo = #{proto_ver := ProtoVer}})
when is_integer(ReasonCode) ->
ReasonName = emqx_reason_codes:name(ReasonCode, ProtoVer), ReasonName = emqx_reason_codes:name(ReasonCode, ProtoVer),
handle_out({disconnect, ReasonCode, ReasonName}, Channel); handle_out(disconnect, {ReasonCode, ReasonName}, Channel);
handle_out({disconnect, ReasonCode, ReasonName}, handle_out(disconnect, {ReasonCode, ReasonName}, Channel = #channel{conninfo = ConnInfo}) ->
Channel = #channel{conninfo = #{proto_ver := ProtoVer, #{proto_ver := ProtoVer, expiry_interval := ExpiryInterval} = ConnInfo,
expiry_interval := ExpiryInterval}}) ->
case {ExpiryInterval, ProtoVer} of case {ExpiryInterval, ProtoVer} of
{0, ?MQTT_PROTO_V5} -> {0, ?MQTT_PROTO_V5} ->
shutdown(ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel); shutdown(ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel);
@ -673,7 +687,7 @@ handle_out({disconnect, ReasonCode, ReasonName},
{ok, {close, ReasonName}, NChannel} {ok, {close, ReasonName}, NChannel}
end; end;
handle_out({Type, Data}, Channel) -> handle_out(Type, Data, Channel) ->
?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]), ?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]),
{ok, Channel}. {ok, Channel}.
@ -683,38 +697,40 @@ handle_out({Type, Data}, Channel) ->
-spec(handle_call(Req :: term(), channel()) -spec(handle_call(Req :: term(), channel())
-> {reply, Reply :: term(), channel()} -> {reply, Reply :: term(), channel()}
| {stop, Reason :: term(), Reply :: term(), channel()}). | {shutdown, Reason :: term(), Reply :: term(), channel()}).
handle_call(kick, Channel) -> handle_call(kick, Channel) ->
{stop, {shutdown, kicked}, ok, Channel}; shutdown(kicked, ok, Channel);
handle_call(discard, Channel = #channel{conn_state = connected}) -> handle_call(discard, Channel = #channel{conn_state = connected}) ->
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
{stop, {shutdown, discarded}, ok, Packet, Channel}; {shutdown, discarded, ok, Packet, Channel};
handle_call(discard, Channel = #channel{conn_state = disconnected}) -> handle_call(discard, Channel = #channel{conn_state = disconnected}) ->
{stop, {shutdown, discarded}, ok, Channel}; shutdown(discarded, ok, Channel);
%% Session Takeover %% Session Takeover
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
{reply, Session, Channel#channel{takeover = true}}; reply(Session, Channel#channel{takeover = true});
handle_call({takeover, 'end'}, Channel = #channel{session = Session, handle_call({takeover, 'end'}, Channel = #channel{session = Session,
pendings = Pendings}) -> pendings = Pendings}) ->
ok = emqx_session:takeover(Session), ok = emqx_session:takeover(Session),
%% TODO: Should not drain deliver here
Delivers = emqx_misc:drain_deliver(), Delivers = emqx_misc:drain_deliver(),
AllPendings = lists:append(Delivers, Pendings), AllPendings = lists:append(Delivers, Pendings),
{stop, {shutdown, takeovered}, AllPendings, Channel}; shutdown(takeovered, AllPendings, Channel);
handle_call(Req, Channel) -> handle_call(Req, Channel) ->
?LOG(error, "Unexpected call: ~p", [Req]), ?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, Channel}. reply(ignored, Channel).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle Info %% Handle Info
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(handle_info(Info :: term(), channel()) -spec(handle_info(Info :: term(), channel())
-> ok | {ok, channel()} | {stop, Reason :: term(), channel()}). -> ok | {ok, channel()}
| {shutdown, Reason :: term(), channel()}).
handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) -> handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
TopicFilters1 = emqx_hooks:run_fold('client.subscribe', TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
[ClientInfo, #{'Internal' => true}], [ClientInfo, #{'Internal' => true}],
@ -769,7 +785,7 @@ handle_info(Info, Channel) ->
-spec(handle_timeout(reference(), Msg :: term(), channel()) -spec(handle_timeout(reference(), Msg :: term(), channel())
-> {ok, channel()} -> {ok, channel()}
| {ok, Result :: term(), channel()} | {ok, Result :: term(), channel()}
| {stop, Reason :: term(), channel()}). | {shutdown, Reason :: term(), channel()}).
handle_timeout(TRef, {emit_stats, Stats}, handle_timeout(TRef, {emit_stats, Stats},
Channel = #channel{clientinfo = #{clientid := ClientId}, Channel = #channel{clientinfo = #{clientid := ClientId},
timers = #{stats_timer := TRef}}) -> timers = #{stats_timer := TRef}}) ->
@ -784,7 +800,7 @@ handle_timeout(TRef, {keepalive, StatVal},
NChannel = Channel#channel{keepalive = NKeepalive}, NChannel = Channel#channel{keepalive = NKeepalive},
{ok, reset_timer(alive_timer, NChannel)}; {ok, reset_timer(alive_timer, NChannel)};
{error, timeout} -> {error, timeout} ->
handle_out({disconnect, ?RC_KEEP_ALIVE_TIMEOUT}, Channel) handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
end; end;
handle_timeout(TRef, retry_delivery, handle_timeout(TRef, retry_delivery,
@ -1195,15 +1211,21 @@ maybe_gc_and_check_oom(Oct, Channel = #channel{clientinfo = #{zone := Zone},
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-compile({inline, [reply/2]}).
reply(Reply, Channel) ->
{reply, Reply, Channel}.
-compile({inline, [shutdown/2]}).
shutdown(Reason, Channel) ->
{shutdown, Reason, Channel}.
-compile({inline, [shutdown/3]}).
shutdown(Reason, Reply, Channel) ->
{shutdown, Reason, Reply, Channel}.
sp(true) -> 1; sp(true) -> 1;
sp(false) -> 0. sp(false) -> 0.
flag(true) -> 1; flag(true) -> 1;
flag(false) -> 0. flag(false) -> 0.
shutdown(Reason, Channel) ->
{stop, {shutdown, Reason}, Channel}.
shutdown(Reason, Packets, Channel) ->
{stop, {shutdown, Reason}, Packets, Channel}.

View File

@ -24,6 +24,11 @@
-logger_header("[MQTT]"). -logger_header("[MQTT]").
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
%% API %% API
-export([ start_link/3 -export([ start_link/3
, stop/1 , stop/1
@ -121,9 +126,7 @@ info(active_n, #state{active_n = ActiveN}) ->
info(pub_limit, #state{pub_limit = PubLimit}) -> info(pub_limit, #state{pub_limit = PubLimit}) ->
limit_info(PubLimit); limit_info(PubLimit);
info(rate_limit, #state{rate_limit = RateLimit}) -> info(rate_limit, #state{rate_limit = RateLimit}) ->
limit_info(RateLimit); limit_info(RateLimit).
info(channel, #state{channel = Channel}) ->
emqx_channel:info(Channel).
limit_info(Limit) -> limit_info(Limit) ->
emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit). emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit).
@ -158,17 +161,9 @@ init(Parent, Transport, RawSocket, Options) ->
case Transport:wait(RawSocket) of case Transport:wait(RawSocket) of
{ok, Socket} -> {ok, Socket} ->
do_init(Parent, Transport, Socket, Options); do_init(Parent, Transport, Socket, Options);
{error, Reason} when Reason =:= enotconn;
Reason =:= einval;
Reason =:= closed ->
Transport:fast_close(RawSocket),
exit(normal);
{error, timeout} ->
Transport:fast_close(RawSocket),
exit({shutdown, ssl_upgrade_timeout});
{error, Reason} -> {error, Reason} ->
Transport:fast_close(RawSocket), ok = Transport:fast_close(RawSocket),
exit(Reason) exit_on_sock_error(Reason)
end. end.
do_init(Parent, Transport, Socket, Options) -> do_init(Parent, Transport, Socket, Options) ->
@ -209,15 +204,10 @@ do_init(Parent, Transport, Socket, Options) ->
}, },
case activate_socket(State) of case activate_socket(State) of
{ok, NState} -> {ok, NState} ->
recvloop(NState, #{hibernate_after => HibAfterTimeout}); hibernate(NState, #{hibernate_after => HibAfterTimeout});
{error, Reason} when Reason =:= einval;
Reason =:= enotconn;
Reason =:= closed ->
Transport:fast_close(Socket),
exit(normal);
{error, Reason} -> {error, Reason} ->
Transport:fast_close(Socket), ok = Transport:fast_close(Socket),
erlang:exit({shutdown, Reason}) exit_on_sock_error(Reason)
end. end.
-compile({inline, [init_limiter/1]}). -compile({inline, [init_limiter/1]}).
@ -225,6 +215,15 @@ init_limiter(undefined) -> undefined;
init_limiter({Rate, Burst}) -> init_limiter({Rate, Burst}) ->
esockd_rate_limit:new(Rate, Burst). esockd_rate_limit:new(Rate, Burst).
exit_on_sock_error(Reason) when Reason =:= einval;
Reason =:= enotconn;
Reason =:= closed ->
erlang:exit(normal);
exit_on_sock_error(timeout) ->
erlang:exit({shutdown, ssl_upgrade_timeout});
exit_on_sock_error(Reason) ->
erlang:exit({shutdown, Reason}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Recv Loop %% Recv Loop
@ -291,8 +290,8 @@ handle_msg({Inet, _Sock, Data}, State = #state{channel = Channel})
Oct = iolist_size(Data), Oct = iolist_size(Data),
emqx_pd:update_counter(incoming_bytes, Oct), emqx_pd:update_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct), ok = emqx_metrics:inc('bytes.received', Oct),
{ok, NChannel} = emqx_channel:handle_in(Oct, Channel), NChannel = emqx_channel:recvd(Oct, Channel),
process_incoming(Data, State#state{channel = NChannel}); parse_incoming(Data, State#state{channel = NChannel});
handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
State = #state{idle_timer = IdleTimer}) -> State = #state{idle_timer = IdleTimer}) ->
@ -316,24 +315,16 @@ handle_msg({Closed, _Sock}, State)
handle_msg({Passive, _Sock}, State) handle_msg({Passive, _Sock}, State)
when Passive == tcp_passive; Passive == ssl_passive -> when Passive == tcp_passive; Passive == ssl_passive ->
%% Rate limit and activate socket here. %% Rate limit here:)
NState = ensure_rate_limit(State), NState = ensure_rate_limit(State),
case activate_socket(NState) of handle_info(activate_socket, NState);
{ok, NState} -> {ok, NState};
{error, Reason} ->
{ok, {sock_error, Reason}, NState}
end;
%% Rate limit timer expired. %% Rate limit timer expired.
handle_msg(activate_socket, State) -> handle_msg(activate_socket, State) ->
NState = State#state{sockstate = idle, NState = State#state{sockstate = idle,
limit_timer = undefined limit_timer = undefined
}, },
case activate_socket(NState) of handle_info(activate_socket, NState);
{ok, NState} -> {ok, NState};
{error, Reason} ->
{ok, {sock_error, Reason}, State}
end;
handle_msg(Deliver = {deliver, _Topic, _Msg}, handle_msg(Deliver = {deliver, _Topic, _Msg},
State = #state{channel = Channel}) -> State = #state{channel = Channel}) ->
@ -342,7 +333,8 @@ handle_msg(Deliver = {deliver, _Topic, _Msg},
handle_return(Result, State); handle_return(Result, State);
handle_msg({outgoing, Packets}, State) -> handle_msg({outgoing, Packets}, State) ->
{ok, handle_outgoing(Packets, State)}; NState = handle_outgoing(Packets, State),
{ok, NState};
%% something sent %% something sent
handle_msg({inet_reply, _Sock, ok}, _State) -> handle_msg({inet_reply, _Sock, ok}, _State) ->
@ -362,13 +354,10 @@ handle_msg(Msg, State) -> handle_info(Msg, State).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Terminate %% Terminate
terminate(Reason, #state{transport = Transport, terminate(Reason, State = #state{channel = Channel}) ->
socket = Socket, ?LOG(debug, "Terminated due to ~p", [Reason]),
sockstate = SockSt,
channel = Channel}) ->
?LOG(debug, "Terminated for ~p", [Reason]),
SockSt =:= closed orelse Transport:fast_close(Socket),
emqx_channel:terminate(Reason, Channel), emqx_channel:terminate(Reason, Channel),
close_socket(State),
exit(Reason). exit(Reason).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -399,19 +388,19 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
case emqx_channel:handle_call(Req, Channel) of case emqx_channel:handle_call(Req, Channel) of
{reply, Reply, NChannel} -> {reply, Reply, NChannel} ->
{reply, Reply, State#state{channel = NChannel}}; {reply, Reply, State#state{channel = NChannel}};
{stop, Reason, Reply, NChannel} -> {shutdown, Reason, Reply, NChannel} ->
{stop, Reason, Reply, State#state{channel = NChannel}}; shutdown(Reason, Reply, State#state{channel = NChannel});
{stop, Reason, Reply, OutPacket, NChannel} -> {shutdown, Reason, Reply, OutPacket, NChannel} ->
NState = State#state{channel = NChannel}, NState = State#state{channel = NChannel},
NState1 = handle_outgoing(OutPacket, NState), NState1 = handle_outgoing(OutPacket, NState),
{stop, Reason, Reply, NState1} shutdown(Reason, Reply, NState1)
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle timeout %% Handle timeout
handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) -> handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
stop(idle_timeout, State); shutdown(idle_timeout, State);
handle_timeout(TRef, emit_stats, State) -> handle_timeout(TRef, emit_stats, State) ->
handle_timeout(TRef, {emit_stats, stats(State)}, State); handle_timeout(TRef, {emit_stats, stats(State)}, State);
@ -422,23 +411,19 @@ handle_timeout(TRef, keepalive, State = #state{transport = Transport,
{ok, [{recv_oct, RecvOct}]} -> {ok, [{recv_oct, RecvOct}]} ->
handle_timeout(TRef, {keepalive, RecvOct}, State); handle_timeout(TRef, {keepalive, RecvOct}, State);
{error, Reason} -> {error, Reason} ->
handle_info({sockerr, Reason}, State) handle_info({sock_error, Reason}, State)
end; end;
handle_timeout(TRef, Msg, State = #state{channel = Channel}) -> handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State). handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process/Parse incoming data. %% Parse incoming data
-compile({inline, [process_incoming/2]}).
process_incoming(Data, State) ->
{Packets, NState} = parse_incoming(Data, State),
{ok, next_incoming_msgs(Packets), NState}.
-compile({inline, [parse_incoming/2]}). -compile({inline, [parse_incoming/2]}).
parse_incoming(Data, State) -> parse_incoming(Data, State) ->
parse_incoming(Data, [], State). {Packets, NState} = parse_incoming(Data, [], State),
{ok, next_incoming_msgs(Packets), NState}.
parse_incoming(<<>>, Packets, State) -> parse_incoming(<<>>, Packets, State) ->
{Packets, State}; {Packets, State};
@ -483,12 +468,12 @@ handle_return({ok, NChannel}, State) ->
{ok, State#state{channel = NChannel}}; {ok, State#state{channel = NChannel}};
handle_return({ok, Replies, NChannel}, State) -> handle_return({ok, Replies, NChannel}, State) ->
{ok, next_msgs(Replies), State#state{channel = NChannel}}; {ok, next_msgs(Replies), State#state{channel = NChannel}};
handle_return({stop, Reason, NChannel}, State) -> handle_return({shutdown, Reason, NChannel}, State) ->
stop(Reason, State#state{channel = NChannel}); shutdown(Reason, State#state{channel = NChannel});
handle_return({stop, Reason, OutPacket, NChannel}, State) -> handle_return({shutdown, Reason, OutPacket, NChannel}, State) ->
NState = State#state{channel = NChannel}, NState = State#state{channel = NChannel},
NState1 = handle_outgoing(OutPacket, NState), NState1 = handle_outgoing(OutPacket, NState),
stop(Reason, NState1). shutdown(Reason, NState1).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle outgoing packets %% Handle outgoing packets
@ -522,8 +507,7 @@ send(IoData, State = #state{transport = Transport,
ok = emqx_metrics:inc('bytes.sent', Oct), ok = emqx_metrics:inc('bytes.sent', Oct),
case Transport:async_send(Socket, IoData) of case Transport:async_send(Socket, IoData) of
ok -> ok ->
{ok, NChannel} = emqx_channel:handle_out(Oct, Channel), State#state{channel = emqx_channel:sent(Oct, Channel)};
State#state{channel = NChannel};
Error = {error, _Reason} -> Error = {error, _Reason} ->
%% Simulate an inet_reply to postpone handling the error %% Simulate an inet_reply to postpone handling the error
self() ! {inet_reply, Socket, Error}, State self() ! {inet_reply, Socket, Error}, State
@ -542,8 +526,16 @@ handle_info({enter, _}, State = #state{active_n = ActiveN,
Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}), Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
handle_info({register, Attrs, stats(State)}, State); handle_info({register, Attrs, stats(State)}, State);
handle_info({sockerr, _Reason}, #state{sockstate = closed}) -> ok; handle_info(activate_socket, State) ->
handle_info({sockerr, Reason}, State) -> case activate_socket(State) of
{ok, NState} -> {ok, NState};
{error, Reason} ->
handle_info({sock_error, Reason}, State)
end;
%%TODO: this is not right
handle_info({sock_error, _Reason}, #state{sockstate = closed}) -> ok;
handle_info({sock_error, Reason}, State) ->
?LOG(debug, "Socket error: ~p", [Reason]), ?LOG(debug, "Socket error: ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State)); handle_info({sock_closed, Reason}, close_socket(State));
@ -578,6 +570,8 @@ activate_socket(State = #state{transport = Transport,
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Close Socket %% Close Socket
close_socket(State = #state{sockstate = closed}) ->
State;
close_socket(State = #state{transport = Transport, socket = Socket}) -> close_socket(State = #state{transport = Transport, socket = Socket}) ->
ok = Transport:fast_close(Socket), ok = Transport:fast_close(Socket),
State#state{sockstate = closed}. State#state{sockstate = closed}.
@ -641,7 +635,16 @@ next_msgs(Action) when is_tuple(Action) ->
next_msgs(Actions) when is_list(Actions) -> next_msgs(Actions) when is_list(Actions) ->
Actions. Actions.
shutdown(Reason, State) ->
stop({shutdown, Reason}, State).
shutdown(Reason, Reply, State) ->
stop({shutdown, Reason}, Reply, State).
-compile({inline, [stop/2]}). -compile({inline, [stop/2]}).
stop(Reason, State) -> stop(Reason, State) ->
{stop, Reason, State}. {stop, Reason, State}.
stop(Reason, Reply, State) ->
{stop, Reason, Reply, State}.

View File

@ -50,6 +50,11 @@
-logger_header("[Session]"). -logger_header("[Session]").
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-export([init/2]). -export([init/2]).
-export([ info/1 -export([ info/1
@ -58,9 +63,6 @@
, stats/1 , stats/1
]). ]).
%% Exports for unit tests
-export([set_field/3]).
-export([ subscribe/4 -export([ subscribe/4
, unsubscribe/3 , unsubscribe/3
]). ]).
@ -84,6 +86,9 @@
-export([expire/2]). -export([expire/2]).
%% export for ct
-export([set_field/3]).
-export_type([session/0]). -export_type([session/0]).
-import(emqx_zone, [get_env/3]). -import(emqx_zone, [get_env/3]).

View File

@ -24,6 +24,11 @@
-logger_header("[MQTT/WS]"). -logger_header("[MQTT/WS]").
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
%% API %% API
-export([ info/1 -export([ info/1
, stats/1 , stats/1
@ -188,8 +193,8 @@ websocket_handle({binary, Data}, State = #state{channel = Channel}) ->
?LOG(debug, "RECV ~p", [Data]), ?LOG(debug, "RECV ~p", [Data]),
Oct = iolist_size(Data), Oct = iolist_size(Data),
ok = inc_recv_stats(1, Oct), ok = inc_recv_stats(1, Oct),
{ok, NChannel} = emqx_channel:handle_in(Oct, Channel), {ok, NChannel} = emqx_channel:recvd(Oct, Channel),
process_incoming(Data, State#state{channel = NChannel}); parse_incoming(Data, State#state{channel = NChannel});
%% Pings should be replied with pongs, cowboy does it automatically %% Pings should be replied with pongs, cowboy does it automatically
%% Pongs can be safely ignored. Clause here simply prevents crash. %% Pongs can be safely ignored. Clause here simply prevents crash.
@ -282,12 +287,6 @@ handle_call(From, Req, State = #state{channel = Channel}) ->
stop(Reason, enqueue(OutPacket, NState)) stop(Reason, enqueue(OutPacket, NState))
end. end.
%%--------------------------------------------------------------------
%% Handle timeout
handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle Info %% Handle Info
@ -302,18 +301,24 @@ handle_info(Info, State = #state{channel = Channel}) ->
handle_return(emqx_channel:handle_info(Info, Channel), State). handle_return(emqx_channel:handle_info(Info, Channel), State).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process incoming data %% Handle timeout
process_incoming(<<>>, State) -> handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
%%--------------------------------------------------------------------
%% Parse incoming data
parse_incoming(<<>>, State) ->
{ok, State}; {ok, State};
process_incoming(Data, State = #state{parse_state = ParseState}) -> parse_incoming(Data, State = #state{parse_state = ParseState}) ->
try emqx_frame:parse(Data, ParseState) of try emqx_frame:parse(Data, ParseState) of
{more, NParseState} -> {more, NParseState} ->
{ok, State#state{parse_state = NParseState}}; {ok, State#state{parse_state = NParseState}};
{ok, Packet, Rest, NParseState} -> {ok, Packet, Rest, NParseState} ->
self() ! {incoming, Packet}, self() ! {incoming, Packet},
process_incoming(Rest, State#state{parse_state = NParseState}) parse_incoming(Rest, State#state{parse_state = NParseState})
catch catch
error:Reason:Stk -> error:Reason:Stk ->
?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nFrame data: ~p", ?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nFrame data: ~p",
@ -343,9 +348,9 @@ handle_return({ok, NChannel}, State) ->
reply(State#state{channel= NChannel}); reply(State#state{channel= NChannel});
handle_return({ok, Replies, NChannel}, State) -> handle_return({ok, Replies, NChannel}, State) ->
reply(Replies, State#state{channel= NChannel}); reply(Replies, State#state{channel= NChannel});
handle_return({stop, Reason, NChannel}, State) -> handle_return({shutdown, Reason, NChannel}, State) ->
stop(Reason, State#state{channel = NChannel}); stop(Reason, State#state{channel = NChannel});
handle_return({stop, Reason, OutPacket, NChannel}, State) -> handle_return({shutdown, Reason, OutPacket, NChannel}, State) ->
NState = State#state{channel = NChannel}, NState = State#state{channel = NChannel},
stop(Reason, enqueue(OutPacket, NState)). stop(Reason, enqueue(OutPacket, NState)).
@ -356,7 +361,7 @@ handle_outgoing(Packets, State = #state{channel = Channel}) ->
IoData = lists:map(serialize_and_inc_stats_fun(State), Packets), IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
ok = inc_sent_stats(length(Packets), Oct), ok = inc_sent_stats(length(Packets), Oct),
{ok, NChannel} = emqx_channel:handle_out(Oct, Channel), NChannel = emqx_channel:sent(Oct, Channel),
{{binary, IoData}, State#state{channel = NChannel}}. {{binary, IoData}, State#state{channel = NChannel}}.
%% TODO: Duplicated with emqx_channel:serialize_and_inc_stats_fun/1 %% TODO: Duplicated with emqx_channel:serialize_and_inc_stats_fun/1

View File

@ -45,19 +45,43 @@
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
%%--------------------------------------------------------------------
%% CT Callbacks
%%--------------------------------------------------------------------
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:boot_modules([router, broker]),
emqx_ct_helpers:start_apps([]),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]). ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases for handle_in %% Test cases for channel info/stats/caps
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_handle_connect(_) -> t_chan_info(_) -> error('TODO').
t_chan_attrs(_) -> error('TODO').
t_chan_stats(_) -> error('TODO').
t_chan_caps(_) -> error('TODO').
t_chan_recvd(_) -> error('TODO').
t_chan_sent(_) -> error('TODO').
%%--------------------------------------------------------------------
%% Test cases for channel init
%%--------------------------------------------------------------------
t_chan_init(_) -> error('TODO').
%%--------------------------------------------------------------------
%% Test cases for channel handle_in
%%--------------------------------------------------------------------
t_handle_in_connect_packet(_) ->
ConnPkt = #mqtt_packet_connect{ ConnPkt = #mqtt_packet_connect{
proto_name = <<"MQTT">>, proto_name = <<"MQTT">>,
proto_ver = ?MQTT_PROTO_V4, proto_ver = ?MQTT_PROTO_V4,
@ -69,35 +93,35 @@ t_handle_connect(_) ->
username = <<"username">>, username = <<"username">>,
password = <<"passwd">> password = <<"passwd">>
}, },
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}),
ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}), ExpectedOutput = [{enter, connected},{outgoing, ConnAck}],
ExpectedOutput = [{enter, connected},{outgoing, ConnAck}], {ok, Output, Channel1} = handle_in(?CONNECT_PACKET(ConnPkt), Channel),
{ok, Output, Channel1} = handle_in(?CONNECT_PACKET(ConnPkt), Channel), ?assertEqual(ExpectedOutput, Output),
?assertEqual(ExpectedOutput, Output), #{clientid := ClientId, username := Username} = emqx_channel:info(clientinfo, Channel1),
#{clientid := ClientId, username := Username} = emqx_channel:info(clientinfo, Channel1), ?assertEqual(<<"clientid">>, ClientId),
?assertEqual(<<"clientid">>, ClientId), ?assertEqual(<<"username">>, Username)
?assertEqual(<<"username">>, Username) end).
end).
t_handle_in_publish_qos0(_) -> t_handle_in_unexpected_connect_packet(_) ->
with_channel( error('TODO').
fun(Channel) ->
Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
{ok, Channel1} = handle_in(Publish, Channel),
?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, Channel1))
end).
t_handle_in_publish_qos1(_) -> t_handle_in_qos0_publish(_) ->
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>), {ok, Channel1} = handle_in(Publish, Channel),
{ok, ?PUBACK_PACKET(1, RC), _} = handle_in(Publish, Channel), ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, Channel1))
?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)) end).
end).
t_handle_publish_qos2(_) -> t_handle_in_qos1_publish(_) ->
with_channel( with_chan(fun(Channel) ->
Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBACK_PACKET(1, RC), _} = handle_in(Publish, Channel),
?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS))
end).
t_handle_in_qos2_publish(_) ->
with_chan(
fun(Channel) -> fun(Channel) ->
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBREC_PACKET(1, RC), Channel1} = handle_in(Publish1, Channel), {ok, ?PUBREC_PACKET(1, RC), Channel1} = handle_in(Publish1, Channel),
@ -109,14 +133,14 @@ t_handle_publish_qos2(_) ->
end). end).
t_handle_in_puback(_) -> t_handle_in_puback(_) ->
with_channel( with_chan(
fun(Channel) -> fun(Channel) ->
{ok, Channel1} = handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel), {ok, Channel1} = handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel),
?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel1)) ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel1))
end). end).
t_handle_in_pubrec(_) -> t_handle_in_pubrec(_) ->
with_channel( with_chan(
fun(Channel) -> fun(Channel) ->
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel1} {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel1}
= handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel), = handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel),
@ -124,7 +148,7 @@ t_handle_in_pubrec(_) ->
end). end).
t_handle_in_pubrel(_) -> t_handle_in_pubrel(_) ->
with_channel( with_chan(
fun(Channel) -> fun(Channel) ->
{ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel1} {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel1}
= handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel), = handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel),
@ -132,14 +156,14 @@ t_handle_in_pubrel(_) ->
end). end).
t_handle_in_pubcomp(_) -> t_handle_in_pubcomp(_) ->
with_channel( with_chan(
fun(Channel) -> fun(Channel) ->
{ok, Channel1} = handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel), {ok, Channel1} = handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel),
?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)) ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1))
end). end).
t_handle_subscribe(_) -> t_handle_in_subscribe(_) ->
with_channel( with_chan(
fun(Channel) -> fun(Channel) ->
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
{ok, ?SUBACK_PACKET(10, [?QOS_0]), Channel1} {ok, ?SUBACK_PACKET(10, [?QOS_0]), Channel1}
@ -149,55 +173,68 @@ t_handle_subscribe(_) ->
?assertEqual(maps:from_list(TopicFilters), Subscriptions) ?assertEqual(maps:from_list(TopicFilters), Subscriptions)
end). end).
t_handle_unsubscribe(_) -> t_handle_in_unsubscribe(_) ->
with_channel( with_chan(
fun(Channel) -> fun(Channel) ->
{ok, ?UNSUBACK_PACKET(11), Channel} {ok, ?UNSUBACK_PACKET(11), Channel}
= handle_in(?UNSUBSCRIBE_PACKET(11, #{}, [<<"+">>]), Channel) = handle_in(?UNSUBSCRIBE_PACKET(11, #{}, [<<"+">>]), Channel)
end). end).
t_handle_pingreq(_) -> t_handle_in_pingreq(_) ->
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> {ok, ?PACKET(?PINGRESP), Channel} = handle_in(?PACKET(?PINGREQ), Channel)
{ok, ?PACKET(?PINGRESP), Channel} = handle_in(?PACKET(?PINGREQ), Channel) end).
end).
t_handle_disconnect(_) -> t_handle_in_disconnect(_) ->
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> {stop, {shutdown, normal}, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel),
{stop, {shutdown, normal}, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel), ?assertEqual(undefined, emqx_channel:info(will_msg, Channel1))
?assertEqual(undefined, emqx_channel:info(will_msg, Channel1)) end).
end).
t_handle_in_auth(_) -> t_handle_in_auth(_) ->
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR),
Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR), {stop, {shutdown, implementation_specific_error}, Packet, Channel} = handle_in(?AUTH_PACKET(), Channel)
{stop, {shutdown, implementation_specific_error}, Packet, Channel} = handle_in(?AUTH_PACKET(), Channel) end).
end).
%%-------------------------------------------------------------------- t_handle_in_frame_error(_) ->
%% Test cases for handle_deliver with_chan(fun(Channel) -> error('TODO') end).
%%--------------------------------------------------------------------
t_handle_deliver(_) -> t_handle_in_expected_packet(_) ->
with_connected_channel( with_chan(fun(Channel) -> error('TODO') end).
fun(Channel) ->
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS#{qos => ?QOS_2}}], t_process_connect(_) ->
{ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel1} with_chan(fun(Channel) -> error('TODO') end).
= handle_in(?SUBSCRIBE_PACKET(1, #{}, TopicFilters), Channel),
Msg0 = emqx_message:make(<<"clientx">>, ?QOS_0, <<"t0">>, <<"qos0">>), t_handle_publish(_) ->
Msg1 = emqx_message:make(<<"clientx">>, ?QOS_1, <<"t1">>, <<"qos1">>), with_chan(fun(Channel) -> error('TODO') end).
Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
{ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, Channel1), t_process_publish(_) ->
?assertEqual([?QOS_0, ?QOS_1], [emqx_packet:qos(Pkt)|| Pkt <- Packets]) with_chan(fun(Channel) -> error('TODO') end).
end).
t_process_subscribe(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_process_unsubscribe(_) ->
with_chan(fun(Channel) -> error('TODO') end).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases for handle_out %% Test cases for handle_out
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_handle_out_connack(_) -> t_handle_out_delivers(_) ->
with_chan(fun(Channel) ->
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS#{qos => ?QOS_2}}],
{ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel1}
= handle_in(?SUBSCRIBE_PACKET(1, #{}, TopicFilters), Channel),
Msg0 = emqx_message:make(<<"clientx">>, ?QOS_0, <<"t0">>, <<"qos0">>),
Msg1 = emqx_message:make(<<"clientx">>, ?QOS_1, <<"t1">>, <<"qos1">>),
Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
{ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, Channel1),
?assertEqual([?QOS_0, ?QOS_1], [emqx_packet:qos(Pkt)|| Pkt <- Packets])
end).
t_handle_out_connack_sucess(_) ->
ConnPkt = #mqtt_packet_connect{ ConnPkt = #mqtt_packet_connect{
proto_name = <<"MQTT">>, proto_name = <<"MQTT">>,
proto_ver = ?MQTT_PROTO_V4, proto_ver = ?MQTT_PROTO_V4,
@ -205,97 +242,209 @@ t_handle_out_connack(_) ->
properties = #{}, properties = #{},
clientid = <<"clientid">> clientid = <<"clientid">>
}, },
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> {ok, [{enter, connected},{outgoing, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}], _Chan}
{ok, [{enter, connected},{outgoing, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}], _Chan} = handle_out({connack, ?RC_SUCCESS, 0, ConnPkt}, Channel),
= handle_out({connack, ?RC_SUCCESS, 0, ConnPkt}, Channel), {stop, {shutdown, not_authorized}, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _}
{stop, {shutdown, not_authorized}, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} = handle_out({connack, ?RC_NOT_AUTHORIZED, ConnPkt}, Channel)
= handle_out({connack, ?RC_NOT_AUTHORIZED, ConnPkt}, Channel) end).
end).
t_handle_out_connack_failure(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_handle_out_publish(_) -> t_handle_out_publish(_) ->
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> Pub0 = {publish, undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
Pub0 = {publish, undefined, emqx_message:make(<<"t">>, <<"qos0">>)}, Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)}, {ok, ?PUBLISH_PACKET(?QOS_0), Channel} = handle_out(Pub0, Channel),
{ok, ?PUBLISH_PACKET(?QOS_0), Channel} = handle_out(Pub0, Channel), {ok, ?PUBLISH_PACKET(?QOS_1), Channel} = handle_out(Pub1, Channel),
{ok, ?PUBLISH_PACKET(?QOS_1), Channel} = handle_out(Pub1, Channel), {ok, {outgoing, Packets}, Channel1} = handle_out({publish, [Pub0, Pub1]}, Channel),
{ok, {outgoing, Packets}, Channel1} = handle_out({publish, [Pub0, Pub1]}, Channel), ?assertEqual(2, length(Packets)),
?assertEqual(2, length(Packets)), ?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, Channel1))
?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, Channel1)) end).
end).
t_handle_out_puback(_) -> t_handle_out_puback(_) ->
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> {ok, Channel} = handle_out({puberr, ?RC_NOT_AUTHORIZED}, Channel),
{ok, Channel} = handle_out({puberr, ?RC_NOT_AUTHORIZED}, Channel), {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Channel1}
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Channel1} = handle_out({puback, 1, ?RC_SUCCESS}, Channel),
= handle_out({puback, 1, ?RC_SUCCESS}, Channel), ?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, Channel1))
?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, Channel1)) end).
end).
t_handle_out_pubrec(_) -> t_handle_out_pubrec(_) ->
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> {ok, ?PUBREC_PACKET(4, ?RC_SUCCESS), Channel1}
{ok, ?PUBREC_PACKET(4, ?RC_SUCCESS), Channel1} = handle_out({pubrec, 4, ?RC_SUCCESS}, Channel),
= handle_out({pubrec, 4, ?RC_SUCCESS}, Channel), ?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, Channel1))
?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, Channel1)) end).
end).
t_handle_out_pubrel(_) -> t_handle_out_pubrel(_) ->
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> {ok, ?PUBREL_PACKET(2), Channel1}
{ok, ?PUBREL_PACKET(2), Channel1} = handle_out({pubrel, 2, ?RC_SUCCESS}, Channel),
= handle_out({pubrel, 2, ?RC_SUCCESS}, Channel), {ok, ?PUBREL_PACKET(3, ?RC_SUCCESS), Channel2}
{ok, ?PUBREL_PACKET(3, ?RC_SUCCESS), Channel2} = handle_out({pubrel, 3, ?RC_SUCCESS}, Channel1),
= handle_out({pubrel, 3, ?RC_SUCCESS}, Channel1), ?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2))
?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2)) end).
end).
t_handle_out_pubcomp(_) -> t_handle_out_pubcomp(_) ->
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> {ok, ?PUBCOMP_PACKET(5, ?RC_SUCCESS), Channel1}
{ok, ?PUBCOMP_PACKET(5, ?RC_SUCCESS), Channel1} = handle_out({pubcomp, 5, ?RC_SUCCESS}, Channel),
= handle_out({pubcomp, 5, ?RC_SUCCESS}, Channel), ?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel1))
?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel1)) end).
end).
t_handle_out_suback(_) -> t_handle_out_suback(_) ->
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> {ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel}
{ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel} = handle_out({suback, 1, [?QOS_2]}, Channel)
= handle_out({suback, 1, [?QOS_2]}, Channel) end).
end).
t_handle_out_unsuback(_) -> t_handle_out_unsuback(_) ->
with_channel( with_chan(fun(Channel) ->
fun(Channel) -> {ok, ?UNSUBACK_PACKET(1), Channel}
{ok, ?UNSUBACK_PACKET(1), Channel} = handle_out({unsuback, 1, [?RC_SUCCESS]}, Channel)
= handle_out({unsuback, 1, [?RC_SUCCESS]}, Channel) end).
end).
t_handle_out_disconnect(_) -> t_handle_out_disconnect(_) ->
with_channel( with_chan(
fun(Channel) -> fun(Channel) ->
handle_out({disconnect, ?RC_SUCCESS}, Channel) handle_out({disconnect, ?RC_SUCCESS}, Channel)
end). end).
t_handle_out_unexpected(_) ->
with_chan(fun(Channel) ->
handle_out({disconnect, ?RC_SUCCESS}, Channel)
end).
%%--------------------------------------------------------------------
%% Test cases for handle_call
%%--------------------------------------------------------------------
t_handle_call_kick(_) ->
error('TODO').
t_handle_call_discard(_) ->
error('TODO').
t_handle_call_takeover(_) ->
error('TODO').
t_handle_call_unexpected(_) ->
error('TODO').
%%--------------------------------------------------------------------
%% Test cases for handle_info
%%--------------------------------------------------------------------
t_handle_info_subscribe(_) ->
error('TODO').
t_handle_info_unsubscribe(_) ->
error('TODO').
t_handle_info_sock_closed(_) ->
error('TODO').
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases for handle_timeout %% Test cases for handle_timeout
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_handle_timeout(_) -> t_handle_timeout_emit_stats(_) ->
with_channel( with_chan(fun(Channel) -> 'TODO' end).
fun(Channel) ->
'TODO' t_handle_timeout_keepalive(_) ->
end). with_chan(fun(Channel) -> 'TODO' end).
t_handle_timeout_retry_delivery(_) ->
with_chan(fun(Channel) -> 'TODO' end).
t_handle_timeout_expire_awaiting_rel(_) ->
with_chan(fun(Channel) -> 'TODO' end).
t_handle_timeout_expire_session(_) ->
with_chan(fun(Channel) -> 'TODO' end).
t_handle_timeout_will_message(_) ->
with_chan(fun(Channel) -> 'TODO' end).
%%--------------------------------------------------------------------
%% Test cases for ensure_timer
%%--------------------------------------------------------------------
t_ensure_timer(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_reset_timer(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_alive_timer(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_retry_timer(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_await_timer(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_expire_timer(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_will_timer(_) ->
with_chan(fun(Channel) -> error('TODO') end).
%%--------------------------------------------------------------------
%% Test cases for internal functions
%%--------------------------------------------------------------------
t_enrich_conninfo(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_enrich_client(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_check_banned(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_check_flapping(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_auth_connect(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_process_alias(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_check_pub_acl(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_check_pub_alias(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_check_subscribe(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_check_sub_acl(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_check_sub_caps(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_enrich_subid(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_enrich_subopts(_) ->
with_chan(fun(Channel) -> error('TODO') end).
t_enrich_caps(_) ->
with_chan(fun(Channel) -> error('TODO') end).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases for terminate %% Test cases for terminate
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_terminate(_) -> t_terminate(_) ->
with_channel( with_chan(
fun(Channel) -> fun(Channel) ->
'TODO' 'TODO'
end). end).
@ -305,15 +454,15 @@ t_terminate(_) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
with_connected_channel(TestFun) -> with_connected_channel(TestFun) ->
with_channel( with_chan(
fun(Channel) -> fun(Channel) ->
TestFun(emqx_channel:set_field(conn_state, connected, Channel)) TestFun(emqx_channel:set_field(conn_state, connected, Channel))
end). end).
with_channel(TestFun) -> with_chan(TestFun) ->
with_channel(#{}, TestFun). with_chan(#{}, TestFun).
with_channel(ConnInfo, TestFun) -> with_chan(ConnInfo, TestFun) ->
ConnInfo1 = maps:merge(?DEFAULT_CONNINFO, ConnInfo), ConnInfo1 = maps:merge(?DEFAULT_CONNINFO, ConnInfo),
ClientInfo = #{zone => <<"external">>, ClientInfo = #{zone => <<"external">>,
protocol => mqtt, protocol => mqtt,

View File

@ -19,57 +19,310 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(Transport, esockd_transport). -define(STATS_KYES, [recv_pkt, recv_msg, send_pkt, send_msg,
recv_oct, recv_cnt, send_oct, send_cnt,
send_pend
]).
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
%%--------------------------------------------------------------------
%% CT callbacks
%%--------------------------------------------------------------------
init_per_suite(Config) -> init_per_suite(Config) ->
ok = meck:new(esockd_transport, [passthrough, no_history]),
ok = meck:new(emqx_channel, [passthrough, no_history]),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok. ok.
t_start_link_error(_) -> init_per_testcase(_TestCase, Config) ->
process_flag(trap_exit, true), %% Meck Transport
ok = meck:expect(esockd_transport, wait, fun(_Sock) -> {error, enotconn} end), ok = meck:new(emqx_transport, [non_strict, passthrough, no_history]),
ok = meck:expect(esockd_transport, fast_close, fun(_Sock) -> ok end), ok = meck:expect(emqx_transport, wait, fun(Sock) -> {ok, Sock} end),
{ok, Pid} = emqx_connection:start_link(esockd_transport, socket, []), ok = meck:expect(emqx_transport, type, fun(_Sock) -> tcp end),
timer:sleep(100), ok = meck:expect(emqx_transport, ensure_ok_or_exit,
?assertNot(erlang:is_process_alive(Pid)), fun(peername, [sock]) -> {ok, {{127,0,0,1}, 3456}};
?assertEqual([{'EXIT', Pid, normal}], proc_mailbox()). (sockname, [sock]) -> {ok, {{127,0,0,1}, 1883}};
(peercert, [sock]) -> undefined
end),
ok = meck:expect(emqx_transport, setopts, fun(_Sock, _Opts) -> ok end),
ok = meck:expect(emqx_transport, getstat, fun(_Sock, Options) ->
{ok, [{K, 0} || K <- Options]}
end),
ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
%% Meck Channel
ok = meck:new(emqx_channel, [passthrough, no_history]),
%% Meck Metrics
ok = meck:new(emqx_metrics, [passthrough, no_history]),
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
Config.
todo_t_basic(_) -> end_per_testcase(_TestCase, Config) ->
Topic = <<"TopicA">>, ok = meck:unload(emqx_transport),
{ok, C} = emqtt:start_link([{port, 1883}, {clientid, <<"hello">>}]), ok = meck:unload(emqx_channel),
{ok, _} = emqtt:connect(C), ok = meck:unload(emqx_metrics),
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1), Config.
{ok, _, [2]} = emqtt:subscribe(C, Topic, qos2),
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
?assertEqual(3, length(recv_msgs(3))),
ok = emqtt:disconnect(C).
proc_mailbox() -> %%--------------------------------------------------------------------
proc_mailbox(self()). %% Test cases
proc_mailbox(Pid) -> %%--------------------------------------------------------------------
{messages, Msgs} = erlang:process_info(Pid, messages),
Msgs.
recv_msgs(Count) -> t_start_link_ok(_) ->
recv_msgs(Count, []). with_connection(fun(CPid) ->
state = element(1, sys:get_state(CPid))
end).
recv_msgs(0, Msgs) -> t_start_link_exit_on_wait(_) ->
Msgs; ok = exit_on_wait_error(enotconn, normal),
recv_msgs(Count, Msgs) -> ok = exit_on_wait_error(einval, normal),
ok = exit_on_wait_error(closed, normal),
ok = exit_on_wait_error(timeout, {shutdown, ssl_upgrade_timeout}),
ok = exit_on_wait_error(enetdown, {shutdown, enetdown}).
t_start_link_exit_on_activate(_) ->
ok = exit_on_activate_error(enotconn, normal),
ok = exit_on_activate_error(einval, normal),
ok = exit_on_activate_error(closed, normal),
ok = exit_on_activate_error(econnreset, {shutdown, econnreset}).
t_get_conn_info(_) ->
with_connection(fun(CPid) ->
#{sockinfo := SockInfo} = emqx_connection:info(CPid),
?assertEqual(#{active_n => 100,
peername => {{127,0,0,1},3456},
pub_limit => undefined,
rate_limit => undefined,
sockname => {{127,0,0,1},1883},
sockstate => running,
socktype => tcp}, SockInfo)
end).
t_get_conn_stats(_) ->
with_connection(fun(CPid) ->
Stats = emqx_connection:stats(CPid),
lists:foreach(fun(Key) ->
0 = proplists:get_value(Key, Stats)
end, ?STATS_KYES)
end).
t_handle_call_discard(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_call,
fun(discard, Channel) ->
{shutdown, discarded, ok, Channel}
end),
ok = emqx_connection:call(CPid, discard),
timer:sleep(100),
ok = trap_exit(CPid, {shutdown, discarded})
end, #{trap_exit => true}).
t_handle_call_takeover(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_call,
fun({takeover, 'begin'}, Channel) ->
{reply, session, Channel};
({takeover, 'end'}, Channel) ->
{shutdown, takeovered, [], Channel}
end),
session = emqx_connection:call(CPid, {takeover, 'begin'}),
[] = emqx_connection:call(CPid, {takeover, 'end'}),
timer:sleep(100),
ok = trap_exit(CPid, {shutdown, takeovered})
end, #{trap_exit => true}).
t_handle_call_any(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_call,
fun(_Req, Channel) -> {reply, ok, Channel} end),
ok = emqx_connection:call(CPid, req)
end).
t_handle_incoming_connect(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
ConnPkt = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
proto_name = <<"MQTT">>,
clientid = <<>>,
clean_start = true,
keepalive = 60
},
Frame = make_frame(?CONNECT_PACKET(ConnPkt)),
CPid ! {tcp, sock, Frame}
end).
t_handle_incoming_publish(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
Frame = make_frame(?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)),
CPid ! {tcp, sock, Frame}
end).
t_handle_incoming_subscribe(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
Frame = <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>,
CPid ! {tcp, sock, Frame}
end).
t_handle_incoming_unsubscribe(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
Frame = <<?UNSUBSCRIBE:4,2:4,10,0,2,0,6,84,111,112,105,99,65>>,
CPid ! {tcp, sock, Frame}
end).
t_handle_sock_error(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_info,
fun({_, Reason}, Channel) ->
{shutdown, Reason, Channel}
end),
%% TODO: fixme later
CPid ! {tcp_error, sock, econnreset},
timer:sleep(100),
trap_exit(CPid, {shutdown, econnreset})
end, #{trap_exit => true}).
t_handle_sock_passive(_) ->
with_connection(fun(CPid) -> CPid ! {tcp_passive, sock} end).
t_handle_sock_activate(_) ->
with_connection(fun(CPid) -> CPid ! activate_socket end).
t_handle_sock_closed(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_info,
fun({sock_closed, Reason}, Channel) ->
{shutdown, Reason, Channel}
end),
CPid ! {tcp_closed, sock},
timer:sleep(100),
%%TODO: closed?
trap_exit(CPid, {shutdown, closed})
end, #{trap_exit => true}).
t_handle_outgoing(_) ->
with_connection(fun(CPid) ->
Publish = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>),
CPid ! {outgoing, Publish},
CPid ! {outgoing, ?PUBREL_PACKET(1)},
CPid ! {outgoing, [?PUBCOMP_PACKET(1)]}
end).
t_conn_rate_limit(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end),
lists:foreach(fun(I) ->
Publish = ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, payload(2000)),
CPid ! {tcp, sock, make_frame(Publish)}
end, [1, 2])
%%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid)
end, #{active_n => 1, rate_limit => {1, 1024}}).
t_conn_pub_limit(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end),
ok = lists:foreach(fun(I) ->
CPid ! {incoming, ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, <<>>)}
end, lists:seq(1, 3))
%%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid)
end, #{active_n => 1, publish_limit => {1, 2}}).
t_oom_shutdown(_) ->
with_connection(fun(CPid) ->
CPid ! {shutdown, message_queue_too_long},
timer:sleep(100),
trap_exit(CPid, {shutdown, message_queue_too_long})
end, #{trap_exit => true}).
t_handle_idle_timeout(_) ->
ok = emqx_zone:set_env(external, idle_timeout, 10),
with_connection(fun(CPid) ->
timer:sleep(100),
trap_exit(CPid, {shutdown, idle_timeout})
end, #{zone => external, trap_exit => true}).
t_handle_emit_stats(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_timeout,
fun(_TRef, _TMsg, Channel) ->
{ok, Channel}
end),
CPid ! {timeout, make_ref(), emit_stats}
end).
t_handle_keepalive_timeout(_) ->
with_connection(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_timeout,
fun(_TRef, _TMsg, Channel) ->
{shutdown, keepalive_timeout, Channel}
end),
CPid ! {timeout, make_ref(), keepalive},
timer:sleep(100),
trap_exit(CPid, {shutdown, keepalive_timeout})
end, #{trap_exit => true}).
t_handle_shutdown(_) ->
with_connection(fun(CPid) ->
CPid ! Shutdown = {shutdown, reason},
timer:sleep(100),
trap_exit(CPid, Shutdown)
end, #{trap_exit => true}).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
exit_on_wait_error(SockErr, Reason) ->
ok = meck:expect(emqx_transport, wait,
fun(_Sock) ->
{error, SockErr}
end),
with_connection(fun(CPid) ->
timer:sleep(100),
trap_exit(CPid, Reason)
end, #{trap_exit => true}).
exit_on_activate_error(SockErr, Reason) ->
ok = meck:expect(emqx_transport, setopts,
fun(_Sock, _Opts) ->
{error, SockErr}
end),
with_connection(fun(CPid) ->
timer:sleep(100),
trap_exit(CPid, Reason)
end, #{trap_exit => true}).
with_connection(TestFun) ->
with_connection(TestFun, #{trap_exit => false}).
with_connection(TestFun, Options) when is_map(Options) ->
with_connection(TestFun, maps:to_list(Options));
with_connection(TestFun, Options) ->
TrapExit = proplists:get_value(trap_exit, Options, false),
process_flag(trap_exit, TrapExit),
{ok, CPid} = emqx_connection:start_link(emqx_transport, sock, Options),
TestFun(CPid),
TrapExit orelse emqx_connection:stop(CPid),
ok.
trap_exit(Pid, Reason) ->
receive receive
{publish, Msg} -> {'EXIT', Pid, Reason} -> ok;
recv_msgs(Count-1, [Msg|Msgs]) {'EXIT', Pid, Other} -> error({unexpect_exit, Other})
after 100 -> after
Msgs 0 -> error({expect_exit, Reason})
end. end.
make_frame(Packet) ->
iolist_to_binary(emqx_frame:serialize(Packet)).
payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).

View File

@ -23,304 +23,81 @@
-include_lib("proper/include/proper.hrl"). -include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(mock_modules,
[ emqx_metrics
, emqx_broker
, emqx_misc
, emqx_message
, emqx_hooks
, emqx_zone
]).
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
t_proper_session(_) -> t_session_init(_) ->
Opts = [{numtests, 100}, {to_file, user}], error('TODO').
ok = emqx_logger:set_log_level(emergency),
ok = before_proper(),
?assert(proper:quickcheck(prop_session(), Opts)),
ok = after_proper().
before_proper() -> %%--------------------------------------------------------------------
load(?mock_modules). %% Test cases for info/stats
%%--------------------------------------------------------------------
after_proper() -> t_session_info(_) ->
unload(?mock_modules), error('TODO').
emqx_logger:set_log_level(error).
prop_session() -> t_session_attrs(_) ->
?FORALL({Session, OpList}, {session(), session_op_list()}, error('TODO').
begin
try
apply_ops(Session, OpList),
true
after
true
end
end).
%%%%%%%%%%%%%%% t_session_stats(_) ->
%%% Helpers %%% error('TODO').
%%%%%%%%%%%%%%%
apply_ops(Session, []) -> %%--------------------------------------------------------------------
?assertEqual(session, element(1, Session)); %% Test cases for pub/sub
apply_ops(Session, [Op | Rest]) -> %%--------------------------------------------------------------------
NSession = apply_op(Session, Op),
apply_ops(NSession, Rest).
apply_op(Session, info) -> t_subscribe(_) ->
Info = emqx_session:info(Session), error('TODO').
?assert(is_map(Info)),
?assert(maps:size(Info) > 0),
Session;
apply_op(Session, attrs) ->
Attrs = emqx_session:attrs(Session),
?assert(is_map(Attrs)),
?assert(maps:size(Attrs) > 0),
Session;
apply_op(Session, stats) ->
Stats = emqx_session:stats(Session),
?assert(is_list(Stats)),
?assert(length(Stats) > 0),
Session;
apply_op(Session, {info, InfoArg}) ->
_Ret = emqx_session:info(InfoArg, Session),
Session;
apply_op(Session, {subscribe, {Client, TopicFilter, SubOpts}}) ->
case emqx_session:subscribe(Client, TopicFilter, SubOpts, Session) of
{ok, NSession} ->
NSession;
{error, ?RC_QUOTA_EXCEEDED} ->
Session
end;
apply_op(Session, {unsubscribe, {Client, TopicFilter}}) ->
case emqx_session:unsubscribe(Client, TopicFilter, Session) of
{ok, NSession} ->
NSession;
{error, ?RC_NO_SUBSCRIPTION_EXISTED} ->
Session
end;
apply_op(Session, {publish, {PacketId, Msg}}) ->
case emqx_session:publish(PacketId, Msg, Session) of
{ok, _Msg} ->
Session;
{ok, _Deliver, NSession} ->
NSession;
{error, _ErrorCode} ->
Session
end;
apply_op(Session, {puback, PacketId}) ->
case emqx_session:puback(PacketId, Session) of
{ok, _Msg, NSession} ->
NSession;
{ok, _Msg, _Publishes, NSession} ->
NSession;
{error, _ErrorCode} ->
Session
end;
apply_op(Session, {pubrec, PacketId}) ->
case emqx_session:pubrec(PacketId, Session) of
{ok, _Msg, NSession} ->
NSession;
{error, _ErrorCode} ->
Session
end;
apply_op(Session, {pubrel, PacketId}) ->
case emqx_session:pubrel(PacketId, Session) of
{ok, NSession} ->
NSession;
{error, _ErrorCode} ->
Session
end;
apply_op(Session, {pubcomp, PacketId}) ->
case emqx_session:pubcomp(PacketId, Session) of
{ok, _Msgs} ->
Session;
{ok, _Msgs, NSession} ->
NSession;
{error, _ErrorCode} ->
Session
end;
apply_op(Session, {deliver, Delivers}) ->
{ok, _Msgs, NSession} = emqx_session:deliver(Delivers, Session),
NSession.
%%%%%%%%%%%%%%%%%% t_unsubscribe(_) ->
%%% Generators %%% error('TODO').
%%%%%%%%%%%%%%%%%%
session_op_list() ->
Union = [info,
attrs,
stats,
{info, info_args()},
{subscribe, sub_args()},
{unsubscribe, unsub_args()},
{publish, publish_args()},
{puback, puback_args()},
{pubrec, pubrec_args()},
{pubrel, pubrel_args()},
{pubcomp, pubcomp_args()},
{deliver, deliver_args()}
],
list(?LAZY(oneof(Union))).
deliver_args() -> t_publish_qos0(_) ->
list({deliver, topic(), message()}). error('TODO').
info_args() -> t_publish_qos1(_) ->
oneof([subscriptions, error('TODO').
subscriptions_max,
upgrade_qos,
inflight,
inflight_max,
retry_interval,
mqueue_len,
mqueue_max,
mqueue_dropped,
next_pkt_id,
awaiting_rel,
awaiting_rel_max,
await_rel_timeout,
created_at
]).
sub_args() -> t_publish_qos2(_) ->
?LET({ClientId, TopicFilter, SubOpts}, error('TODO').
{clientid(), topic(), sub_opts()},
{#{clientid => ClientId}, TopicFilter, SubOpts}).
unsub_args() -> t_puback(_) ->
?LET({ClientId, TopicFilter}, error('TODO').
{clientid(), topic()},
{#{clientid => ClientId}, TopicFilter}).
publish_args() -> t_pubrec(_) ->
?LET({PacketId, Message}, error('TODO').
{packetid(), message()},
{PacketId, Message}).
puback_args() -> t_pubrel(_) ->
packetid(). error('TODO').
pubrec_args() -> t_pubcomp(_) ->
packetid(). error('TODO').
pubrel_args() -> %%--------------------------------------------------------------------
packetid(). %% Test cases for deliver/retry
%%--------------------------------------------------------------------
pubcomp_args() -> t_deliver(_) ->
packetid(). error('TODO').
sub_opts() -> t_enqueue(_) ->
?LET({RH, RAP, NL, QOS, SHARE, SUBID}, error('TODO').
{rh(), rap(), nl(), qos(), share(), subid()}
, make_subopts(RH, RAP, NL, QOS, SHARE, SUBID)).
message() -> t_retry(_) ->
?LET({QoS, Topic, Payload}, error('TODO').
{qos(), topic(), payload()},
emqx_message:make(proper, QoS, Topic, Payload)).
subid() -> integer(). %%--------------------------------------------------------------------
%% Test cases for takeover/resume
%%--------------------------------------------------------------------
rh() -> oneof([0, 1, 2]). t_takeover(_) ->
error('TODO').
rap() -> oneof([0, 1]). t_resume(_) ->
error('TODO').
nl() -> oneof([0, 1]). t_redeliver(_) ->
error('TODO').
qos() -> oneof([0, 1, 2]). t_expire(_) ->
error('TODO').
share() -> binary().
clientid() -> binary().
topic() -> ?LET(No, choose(1, 10),
begin
NoBin = integer_to_binary(No),
<<"topic/", NoBin/binary>>
end).
payload() -> binary().
packetid() -> choose(1, 30).
zone() ->
?LET(Zone, [{max_subscriptions, max_subscription()},
{upgrade_qos, upgrade_qos()},
{retry_interval, retry_interval()},
{max_awaiting_rel, max_awaiting_rel()},
{await_rel_timeout, await_rel_timeout()}]
, maps:from_list(Zone)).
max_subscription() ->
frequency([{33, 0},
{33, 1},
{34, choose(0,10)}]).
upgrade_qos() -> bool().
retry_interval() -> ?LET(Interval, choose(0, 20), Interval*1000).
max_awaiting_rel() -> choose(0, 10).
await_rel_timeout() -> ?LET(Interval, choose(0, 150), Interval*1000).
max_inflight() -> choose(0, 10).
option() ->
?LET(Option, [{receive_maximum , max_inflight()}],
maps:from_list(Option)).
session() ->
?LET({Zone, Options},
{zone(), option()},
begin
Session = emqx_session:init(#{zone => Zone}, Options),
emqx_session:set_field(next_pkt_id, 16#ffff, Session)
end).
%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Internal functions %%%
%%%%%%%%%%%%%%%%%%%%%%%%%%
make_subopts(RH, RAP, NL, QOS, SHARE, SubId) ->
#{rh => RH,
rap => RAP,
nl => NL,
qos => QOS,
share => SHARE,
subid => SubId}.
load(Modules) ->
[mock(Module) || Module <- Modules],
ok.
unload(Modules) ->
lists:foreach(fun(Module) ->
ok = meck:unload(Module)
end, Modules).
mock(Module) ->
ok = meck:new(Module, [passthrough, no_history]),
do_mock(Module).
do_mock(emqx_metrics) ->
meck:expect(emqx_metrics, inc, fun(_Anything) -> ok end);
do_mock(emqx_broker) ->
meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
meck:expect(emqx_broker, set_subopts, fun(_, _) -> ok end),
meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end),
meck:expect(emqx_broker, publish, fun(_) -> ok end);
do_mock(emqx_misc) ->
meck:expect(emqx_misc, start_timer, fun(_, _) -> tref end);
do_mock(emqx_message) ->
meck:expect(emqx_message, set_header, fun(_Hdr, _Val, Msg) -> Msg end),
meck:expect(emqx_message, is_expired, fun(_Msg) -> (rand:uniform(16) > 8) end);
do_mock(emqx_hooks) ->
meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end);
do_mock(emqx_zone) ->
meck:expect(emqx_zone, get_env, fun(Env, Key, Default) -> maps:get(Key, Env, Default) end).

View File

@ -16,42 +16,204 @@
-module(emqx_ws_connection_SUITE). -module(emqx_ws_connection_SUITE).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl"). -import(emqx_ws_connection,
[ websocket_handle/2
, websocket_info/2
]).
-define(STATS_KEYS, [recv_oct, recv_cnt, send_oct, send_cnt,
recv_pkt, recv_msg, send_pkt, send_msg
]).
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
%%--------------------------------------------------------------------
%% CT callbacks
%%--------------------------------------------------------------------
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]). ok.
t_basic(_) -> init_per_testcase(_TestCase, Config) ->
Topic = <<"TopicA">>, %% Meck CowboyReq
{ok, C} = emqtt:start_link([{host, "127.0.0.1"}, {port, 8083}]), ok = meck:new(cowboy_req, [passthrough, no_history]),
{ok, _} = emqtt:ws_connect(C), ok = meck:expect(cowboy_req, peer, fun(_) -> {{127,0,0,1}, 3456} end),
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1), ok = meck:expect(cowboy_req, sock, fun(_) -> {{127,0,0,1}, 8883} end),
{ok, _, [2]} = emqtt:subscribe(C, Topic, qos2), ok = meck:expect(cowboy_req, cert, fun(_) -> undefined end),
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2), ok = meck:expect(cowboy_req, parse_cookies, fun(_) -> undefined end),
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2), %% Meck Channel
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2), ok = meck:new(emqx_channel, [passthrough, no_history]),
?assertEqual(3, length(recv_msgs(3))), ok = meck:expect(emqx_channel, recvd,
ok = emqtt:disconnect(C). fun(_Oct, Channel) ->
{ok, Channel}
end),
%% Meck Metrics
ok = meck:new(emqx_metrics, [passthrough, no_history]),
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
Config.
recv_msgs(Count) -> end_per_testcase(_TestCase, Config) ->
recv_msgs(Count, []). ok = meck:unload(cowboy_req),
ok = meck:unload(emqx_channel),
ok = meck:unload(emqx_metrics),
Config.
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
%%TODO:...
t_ws_conn_init(_) ->
with_ws_conn(fun(_WsConn) -> ok end).
t_ws_conn_info(_) ->
with_ws_conn(fun(WsConn) ->
#{sockinfo := SockInfo} = emqx_ws_connection:info(WsConn),
#{socktype := ws,
peername := {{127,0,0,1}, 3456},
sockname := {{127,0,0,1}, 8883},
sockstate := idle} = SockInfo
end).
t_ws_conn_stats(_) ->
with_ws_conn(fun(WsConn) ->
Stats = emqx_ws_connection:stats(WsConn),
lists:foreach(fun(Key) ->
0 = proplists:get_value(Key, Stats)
end, ?STATS_KEYS)
end).
t_websocket_init(_) ->
with_ws_conn(fun(WsConn) ->
#{sockinfo := SockInfo} = emqx_ws_connection:info(WsConn),
#{socktype := ws,
peername := {{127,0,0,1}, 3456},
sockname := {{127,0,0,1}, 8883},
sockstate := idle
} = SockInfo
end).
t_websocket_handle_binary(_) ->
with_ws_conn(fun(WsConn) ->
ok = meck:expect(emqx_channel, recvd, fun(_Oct, Channel) -> {ok, Channel} end),
{ok, WsConn} = websocket_handle({binary, [<<>>]}, WsConn)
end).
t_websocket_handle_ping_pong(_) ->
with_ws_conn(fun(WsConn) ->
{ok, WsConn} = websocket_handle(ping, WsConn),
{ok, WsConn} = websocket_handle(pong, WsConn),
{ok, WsConn} = websocket_handle({ping, <<>>}, WsConn),
{ok, WsConn} = websocket_handle({pong, <<>>}, WsConn)
end).
t_websocket_handle_bad_frame(_) ->
with_ws_conn(fun(WsConn) ->
{stop, {shutdown, unexpected_ws_frame}, WsConn}
= websocket_handle({badframe, <<>>}, WsConn)
end).
t_websocket_info_call(_) ->
with_ws_conn(fun(WsConn) ->
From = {make_ref(), self()},
Call = {call, From, badreq},
websocket_info(Call, WsConn)
end).
t_websocket_info_cast(_) ->
with_ws_conn(fun(WsConn) ->
websocket_info({cast, msg}, WsConn)
end).
t_websocket_info_incoming(_) ->
with_ws_conn(fun(WsConn) ->
Connect = ?CONNECT_PACKET(
#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
proto_name = <<"MQTT">>,
clientid = <<>>,
clean_start = true,
keepalive = 60}),
{ok, WsConn1} = websocket_info({incoming, Connect}, WsConn),
Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>),
{ok, WsConn2} = websocket_info({incoming, Publish}, WsConn1)
end).
t_websocket_info_deliver(_) ->
with_ws_conn(fun(WsConn) ->
Msg = emqx_message:make(<<"topic">>, <<"payload">>),
Deliver = {deliver, <<"#">>, Msg},
{ok, WsConn1} = websocket_info(Deliver, WsConn)
end).
t_websocket_info_timeout(_) ->
with_ws_conn(fun(WsConn) ->
websocket_info({timeout, make_ref(), keepalive}, WsConn),
websocket_info({timeout, make_ref(), emit_stats}, WsConn),
websocket_info({timeout, make_ref(), retry_delivery}, WsConn)
end).
t_websocket_info_close(_) ->
with_ws_conn(fun(WsConn) ->
{stop, {shutdown, sock_error}, WsConn} = websocket_info({close, sock_error}, WsConn)
end).
t_websocket_info_shutdown(_) ->
with_ws_conn(fun(WsConn) ->
{stop, {shutdown, reason}, WsConn} = websocket_info({shutdown, reason}, WsConn)
end).
t_websocket_info_stop(_) ->
with_ws_conn(fun(WsConn) ->
{stop, normal, WsConn} = websocket_info({stop, normal}, WsConn)
end).
t_websocket_close(_) ->
with_ws_conn(fun(WsConn) ->
{stop, sock_closed, WsConn}
= emqx_ws_connection:websocket_close(badframe, WsConn)
end).
t_handle_call(_) ->
with_ws_conn(fun(WsConn) -> ok end).
t_handle_info(_) ->
with_ws_conn(fun(WsConn) -> ok end).
t_handle_timeout(_) ->
with_ws_conn(fun(WsConn) -> ok end).
t_parse_incoming(_) ->
with_ws_conn(fun(WsConn) -> ok end).
t_handle_incoming(_) ->
with_ws_conn(fun(WsConn) -> ok end).
t_handle_return(_) ->
with_ws_conn(fun(WsConn) -> ok end).
t_handle_outgoing(_) ->
with_ws_conn(fun(WsConn) -> ok end).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
with_ws_conn(TestFun) ->
with_ws_conn(TestFun, []).
with_ws_conn(TestFun, Opts) ->
{ok, WsConn} = emqx_ws_connection:websocket_init(
[req, emqx_misc:merge_opts([{zone, external}], Opts)]),
TestFun(WsConn).
recv_msgs(0, Msgs) ->
Msgs;
recv_msgs(Count, Msgs) ->
receive
{publish, Msg} ->
recv_msgs(Count-1, [Msg|Msgs])
after 100 ->
Msgs
end.