Merge pull request #2975 from emqx/master

Auto-pull-request-by-2019-10-14
This commit is contained in:
Shawn 2019-10-14 21:54:57 +08:00 committed by GitHub
commit fee9a7228b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 688 additions and 652 deletions

View File

@ -554,6 +554,11 @@ mqtt.ignore_loop_deliver = false
## Value: duration ## Value: duration
zone.external.idle_timeout = 15s zone.external.idle_timeout = 15s
## Hibernate after a duration of idle state.
##
## Value: duration
zone.external.hibernate_after = 60s
## Publish limit for the external MQTT connections. ## Publish limit for the external MQTT connections.
## ##
## Value: Number,Duration ## Value: Number,Duration

View File

@ -725,6 +725,12 @@ end}.
{datatype, {duration, ms}} {datatype, {duration, ms}}
]}. ]}.
%% @doc Hibernate after a duration of idle state.
{mapping, "zone.$name.hibernate_after", "emqx.zones", [
{default, "60s"},
{datatype, {duration, ms}}
]}.
{mapping, "zone.$name.allow_anonymous", "emqx.zones", [ {mapping, "zone.$name.allow_anonymous", "emqx.zones", [
{datatype, {enum, [true, false]}} {datatype, {enum, [true, false]}}
]}. ]}.

View File

@ -31,7 +31,7 @@
, caps/1 , caps/1
]). ]).
%% Exports for unit tests:( %% Test Exports
-export([set_field/3]). -export([set_field/3]).
-export([ init/2 -export([ init/2
@ -40,14 +40,9 @@
, handle_call/2 , handle_call/2
, handle_info/2 , handle_info/2
, handle_timeout/3 , handle_timeout/3
, disconnect/2
, terminate/2 , terminate/2
]). ]).
-export([ received/2
, sent/2
]).
-import(emqx_misc, -import(emqx_misc,
[ run_fold/3 [ run_fold/3
, pipeline/3 , pipeline/3
@ -75,8 +70,8 @@
pub_stats :: emqx_types:stats(), pub_stats :: emqx_types:stats(),
%% Timers %% Timers
timers :: #{atom() => disabled | maybe(reference())}, timers :: #{atom() => disabled | maybe(reference())},
%% Fsm State %% Conn State
state :: fsm_state(), conn_state :: conn_state(),
%% GC State %% GC State
gc_state :: maybe(emqx_gc:gc_state()), gc_state :: maybe(emqx_gc:gc_state()),
%% Takeover %% Takeover
@ -89,13 +84,14 @@
-opaque(channel() :: #channel{}). -opaque(channel() :: #channel{}).
-type(fsm_state() :: #{state_name := initialized -type(conn_state() :: idle | connecting | connected | disconnected).
| connecting
| connected -type(action() :: {enter, connected | disconnected}
| disconnected, | {close, Reason :: atom()}
connected_at := pos_integer(), | {outgoing, emqx_types:packet()}
disconnected_at := pos_integer() | {outgoing, [emqx_types:packet()]}).
}).
-type(output() :: emqx_types:packet() | action() | [action()]).
-define(TIMER_TABLE, #{ -define(TIMER_TABLE, #{
stats_timer => emit_stats, stats_timer => emit_stats,
@ -106,7 +102,7 @@
will_timer => will_message will_timer => will_message
}). }).
-define(ATTR_KEYS, [conninfo, clientinfo, state, session]). -define(ATTR_KEYS, [conninfo, clientinfo, session, conn_state]).
-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases, -define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases,
alias_maximum, gc_state]). alias_maximum, gc_state]).
@ -129,8 +125,8 @@ info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo; ClientInfo;
info(session, #channel{session = Session}) -> info(session, #channel{session = Session}) ->
maybe_apply(fun emqx_session:info/1, Session); maybe_apply(fun emqx_session:info/1, Session);
info(state, #channel{state = State}) -> info(conn_state, #channel{conn_state = ConnState}) ->
State; ConnState;
info(keepalive, #channel{keepalive = Keepalive}) -> info(keepalive, #channel{keepalive = Keepalive}) ->
maybe_apply(fun emqx_keepalive:info/1, Keepalive); maybe_apply(fun emqx_keepalive:info/1, Keepalive);
info(topic_aliases, #channel{topic_aliases = Aliases}) -> info(topic_aliases, #channel{topic_aliases = Aliases}) ->
@ -157,6 +153,8 @@ attrs(session, #channel{session = Session}) ->
attrs(Key, Channel) -> info(Key, Channel). attrs(Key, Channel) -> info(Key, Channel).
-spec(stats(channel()) -> emqx_types:stats()). -spec(stats(channel()) -> emqx_types:stats()).
stats(#channel{pub_stats = PubStats, session = undefined}) ->
maps:to_list(PubStats);
stats(#channel{pub_stats = PubStats, session = Session}) -> stats(#channel{pub_stats = PubStats, session = Session}) ->
maps:to_list(PubStats) ++ emqx_session:stats(Session). maps:to_list(PubStats) ++ emqx_session:stats(Session).
@ -204,7 +202,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) ->
clientinfo = ClientInfo, clientinfo = ClientInfo,
pub_stats = #{}, pub_stats = #{},
timers = #{stats_timer => StatsTimer}, timers = #{stats_timer => StatsTimer},
state = #{state_name => initialized}, conn_state = idle,
gc_state = init_gc_state(Zone), gc_state = init_gc_state(Zone),
takeover = false, takeover = false,
resuming = false, resuming = false,
@ -221,15 +219,16 @@ init_gc_state(Zone) ->
%% Handle incoming packet %% Handle incoming packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(handle_in(emqx_types:packet(), channel()) -spec(handle_in(Bytes :: pos_integer() | emqx_types:packet(), channel())
-> {ok, channel()} -> {ok, channel()}
| {ok, emqx_types:packet(), channel()} | {ok, output(), channel()}
| {ok, list(emqx_types:packet()), channel()} | {stop, Reason :: term(), channel()}
| {close, channel()} | {stop, Reason :: term(), output(), channel()}).
| {close, emqx_types:packet(), channel()} handle_in(Bytes, Channel) when is_integer(Bytes) ->
| {stop, Error :: term(), channel()} NChannel = maybe_gc_and_check_oom(Bytes, Channel),
| {stop, Error :: term(), emqx_types:packet(), channel()}). {ok, ensure_timer(stats_timer, NChannel)};
handle_in(?CONNECT_PACKET(_), Channel = #channel{state = #{state_name := connected}}) ->
handle_in(?CONNECT_PACKET(_), Channel = #channel{conn_state = connected}) ->
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel);
handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
@ -243,35 +242,36 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
{ok, NConnPkt, NChannel} -> {ok, NConnPkt, NChannel} ->
process_connect(NConnPkt, NChannel); process_connect(NConnPkt, NChannel);
{error, ReasonCode, NChannel} -> {error, ReasonCode, NChannel} ->
handle_out({connack, emqx_reason_codes:formalized(connack, ReasonCode), ConnPkt}, NChannel) ReasonName = emqx_reason_codes:formalized(connack, ReasonCode),
handle_out({connack, ReasonName, ConnPkt}, NChannel)
end; end;
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
Channel1 = inc_pub_stats(publish_in, Channel), NChannel = inc_pub_stats(publish_in, Channel),
case emqx_packet:check(Packet) of case emqx_packet:check(Packet) of
ok -> handle_publish(Packet, Channel1); ok -> handle_publish(Packet, NChannel);
{error, ReasonCode} -> {error, ReasonCode} ->
handle_out({disconnect, ReasonCode}, Channel1) handle_out({disconnect, ReasonCode}, NChannel)
end; end;
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
Channel = #channel{clientinfo = ClientInfo, session = Session}) -> Channel = #channel{clientinfo = ClientInfo, session = Session}) ->
Channel1 = inc_pub_stats(puback_in, Channel), NChannel = inc_pub_stats(puback_in, Channel),
case emqx_session:puback(PacketId, Session) of case emqx_session:puback(PacketId, Session) of
{ok, Msg, Publishes, NSession} -> {ok, Msg, Publishes, NSession} ->
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
handle_out({publish, Publishes}, Channel1#channel{session = NSession}); handle_out({publish, Publishes}, NChannel#channel{session = NSession});
{ok, Msg, NSession} -> {ok, Msg, NSession} ->
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
{ok, Channel1#channel{session = NSession}}; {ok, NChannel#channel{session = NSession}};
{error, ?RC_PACKET_IDENTIFIER_IN_USE} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
ok = emqx_metrics:inc('packets.puback.inuse'), ok = emqx_metrics:inc('packets.puback.inuse'),
{ok, Channel1}; {ok, NChannel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
ok = emqx_metrics:inc('packets.puback.missed'), ok = emqx_metrics:inc('packets.puback.missed'),
{ok, Channel1} {ok, NChannel}
end; end;
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
@ -357,27 +357,27 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf
OldInterval == 0 andalso Interval > OldInterval -> OldInterval == 0 andalso Interval > OldInterval ->
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel1); handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel1);
Interval == 0 -> Interval == 0 ->
{stop, ReasonName, Channel1}; shutdown(ReasonName, Channel1);
true -> true ->
Channel2 = Channel1#channel{conninfo = ConnInfo#{expiry_interval => Interval}}, Channel2 = Channel1#channel{conninfo = ConnInfo#{expiry_interval => Interval}},
{close, ReasonName, Channel2} {ok, {close, ReasonName}, Channel2}
end; end;
handle_in(?AUTH_PACKET(), Channel) -> handle_in(?AUTH_PACKET(), Channel) ->
handle_out({disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}, Channel); handle_out({disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}, Channel);
handle_in({frame_error, Reason}, Channel = #channel{state = FsmState}) -> handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
case FsmState of shutdown(Reason, Channel);
#{state_name := initialized} ->
{stop, {shutdown, Reason}, Channel}; handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
#{state_name := connecting} -> shutdown(Reason, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
{stop, {shutdown, Reason}, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel};
#{state_name := connected} -> handle_in({frame_error, _Reason}, Channel = #channel{conn_state = connected}) ->
handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel); handle_out({disconnect, ?RC_MALFORMED_PACKET}, Channel);
#{state_name := disconnected} ->
handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
?LOG(error, "Unexpected frame error: ~p", [Reason]), ?LOG(error, "Unexpected frame error: ~p", [Reason]),
{ok, Channel} {ok, Channel};
end;
handle_in(Packet, Channel) -> handle_in(Packet, Channel) ->
?LOG(error, "Unexpected incoming: ~p", [Packet]), ?LOG(error, "Unexpected incoming: ~p", [Packet]),
@ -528,32 +528,59 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
%% Handle outgoing packet %% Handle outgoing packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%TODO: RunFold or Pipeline -spec(handle_out(integer()|term(), channel())
-> {ok, channel()}
| {ok, output(), channel()}
| {stop, Reason :: term(), channel()}
| {stop, Reason :: term(), output(), channel()}).
handle_out(Bytes, Channel) when is_integer(Bytes) ->
NChannel = maybe_gc_and_check_oom(Bytes, Channel),
{ok, ensure_timer(stats_timer, NChannel)};
handle_out(Delivers, Channel = #channel{conn_state = disconnected,
session = Session})
when is_list(Delivers) ->
NSession = emqx_session:enqueue(Delivers, Session),
{ok, Channel#channel{session = NSession}};
handle_out(Delivers, Channel = #channel{takeover = true,
pendings = Pendings})
when is_list(Delivers) ->
{ok, Channel#channel{pendings = lists:append(Pendings, Delivers)}};
handle_out(Delivers, Channel = #channel{session = Session}) when is_list(Delivers) ->
case emqx_session:deliver(Delivers, Session) of
{ok, Publishes, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out({publish, Publishes}, ensure_timer(retry_timer, NChannel));
{ok, NSession} ->
{ok, Channel#channel{session = NSession}}
end;
handle_out({connack, ?RC_SUCCESS, SP, ConnPkt}, handle_out({connack, ?RC_SUCCESS, SP, ConnPkt},
Channel = #channel{conninfo = ConnInfo, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo, clientinfo = ClientInfo}) ->
state = FsmState}) ->
AckProps = run_fold([fun enrich_caps/2, AckProps = run_fold([fun enrich_caps/2,
fun enrich_server_keepalive/2, fun enrich_server_keepalive/2,
fun enrich_assigned_clientid/2], #{}, Channel), fun enrich_assigned_clientid/2], #{}, Channel),
FsmState1 = FsmState#{state_name => connected, ConnInfo1 = ConnInfo#{connected_at => erlang:system_time(second)},
connected_at => erlang:system_time(second) Channel1 = Channel#channel{conninfo = ConnInfo1,
},
Channel1 = Channel#channel{state = FsmState1,
will_msg = emqx_packet:will_msg(ConnPkt), will_msg = emqx_packet:will_msg(ConnPkt),
conn_state = connected,
alias_maximum = init_alias_maximum(ConnPkt, ClientInfo) alias_maximum = init_alias_maximum(ConnPkt, ClientInfo)
}, },
Channel2 = ensure_keepalive(AckProps, Channel1), Channel2 = ensure_keepalive(AckProps, Channel1),
ok = emqx_hooks:run('client.connected', [ClientInfo, ?RC_SUCCESS, ConnInfo]), ok = emqx_hooks:run('client.connected', [ClientInfo, ?RC_SUCCESS, ConnInfo]),
AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps), AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
case maybe_resume_session(Channel2) of case maybe_resume_session(Channel2) of
ignore -> {ok, AckPacket, Channel2}; ignore ->
{ok, [{enter, connected}, {outgoing, AckPacket}], Channel2};
{ok, Publishes, NSession} -> {ok, Publishes, NSession} ->
Channel3 = Channel2#channel{session = NSession, Channel3 = Channel2#channel{session = NSession,
resuming = false, resuming = false,
pendings = []}, pendings = []},
{ok, Packets, _} = handle_out({publish, Publishes}, Channel3), {ok, {outgoing, Packets}, _} = handle_out({publish, Publishes}, Channel3),
{ok, [AckPacket|Packets], Channel3} {ok, [{enter, connected}, {outgoing, [AckPacket|Packets]}], Channel3}
end; end;
handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo, handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo,
@ -564,25 +591,7 @@ handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn
_Ver -> emqx_reason_codes:compat(connack, ReasonCode) _Ver -> emqx_reason_codes:compat(connack, ReasonCode)
end, end,
Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer), Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
{stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel}; shutdown(Reason, ?CONNACK_PACKET(ReasonCode1), Channel);
handle_out({deliver, Delivers}, Channel = #channel{state = #{state_name := disconnected},
session = Session}) ->
NSession = emqx_session:enqueue(Delivers, Session),
{ok, Channel#channel{session = NSession}};
handle_out({deliver, Delivers}, Channel = #channel{takeover = true,
pendings = Pendings}) ->
{ok, Channel#channel{pendings = lists:append(Pendings, Delivers)}};
handle_out({deliver, Delivers}, Channel = #channel{session = Session}) ->
case emqx_session:deliver(Delivers, Session) of
{ok, Publishes, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out({publish, Publishes}, ensure_timer(retry_timer, NChannel));
{ok, NSession} ->
{ok, Channel#channel{session = NSession}}
end;
handle_out({publish, Publishes}, Channel) when is_list(Publishes) -> handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
Packets = lists:foldl( Packets = lists:foldl(
@ -594,7 +603,7 @@ handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
end end
end, [], Publishes), end, [], Publishes),
NChannel = inc_pub_stats(publish_out, length(Packets), Channel), NChannel = inc_pub_stats(publish_out, length(Packets), Channel),
{ok, lists:reverse(Packets), NChannel}; {ok, {outgoing, lists:reverse(Packets)}, NChannel};
%% Ignore loop deliver %% Ignore loop deliver
handle_out({publish, _PacketId, #message{from = ClientId, handle_out({publish, _PacketId, #message{from = ClientId,
@ -640,24 +649,28 @@ handle_out({disconnect, ReasonCode}, Channel = #channel{conninfo = #{proto_ver :
ReasonName = emqx_reason_codes:name(ReasonCode, ProtoVer), ReasonName = emqx_reason_codes:name(ReasonCode, ProtoVer),
handle_out({disconnect, ReasonCode, ReasonName}, Channel); handle_out({disconnect, ReasonCode, ReasonName}, Channel);
%%TODO: Improve later...
handle_out({disconnect, ReasonCode, ReasonName}, handle_out({disconnect, ReasonCode, ReasonName},
Channel = #channel{conninfo = #{proto_ver := ProtoVer, Channel = #channel{conninfo = #{proto_ver := ProtoVer,
expiry_interval := ExpiryInterval}}) -> expiry_interval := ExpiryInterval}}) ->
case {ExpiryInterval, ProtoVer} of case {ExpiryInterval, ProtoVer} of
{0, ?MQTT_PROTO_V5} -> {0, ?MQTT_PROTO_V5} ->
{stop, ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel}; shutdown(ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel);
{0, _Ver} -> {0, _Ver} ->
{stop, ReasonName, Channel}; shutdown(ReasonName, Channel);
{?UINT_MAX, ?MQTT_PROTO_V5} -> {?UINT_MAX, ?MQTT_PROTO_V5} ->
{close, ReasonName, ?DISCONNECT_PACKET(ReasonCode), Channel}; Output = [{outgoing, ?DISCONNECT_PACKET(ReasonCode)},
{close, ReasonName}],
{ok, Output, Channel};
{?UINT_MAX, _Ver} -> {?UINT_MAX, _Ver} ->
{close, ReasonName, Channel}; {ok, {close, ReasonName}, Channel};
{Interval, ?MQTT_PROTO_V5} -> {Interval, ?MQTT_PROTO_V5} ->
NChannel = ensure_timer(expire_timer, Interval, Channel), NChannel = ensure_timer(expire_timer, Interval, Channel),
{close, ReasonName, ?DISCONNECT_PACKET(ReasonCode), NChannel}; Output = [{outgoing, ?DISCONNECT_PACKET(ReasonCode)},
{close, ReasonName}],
{ok, Output, NChannel};
{Interval, _Ver} -> {Interval, _Ver} ->
{close, ReasonName, ensure_timer(expire_timer, Interval, Channel)} NChannel = ensure_timer(expire_timer, Interval, Channel),
{ok, {close, ReasonName}, NChannel}
end; end;
handle_out({Type, Data}, Channel) -> handle_out({Type, Data}, Channel) ->
@ -668,28 +681,33 @@ handle_out({Type, Data}, Channel) ->
%% Handle call %% Handle call
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(handle_call(Req :: term(), channel())
-> {reply, Reply :: term(), channel()}
| {stop, Reason :: term(), Reply :: term(), channel()}).
handle_call(kick, Channel) -> handle_call(kick, Channel) ->
{stop, {shutdown, kicked}, ok, Channel}; {stop, {shutdown, kicked}, ok, Channel};
handle_call(discard, Channel = #channel{state = #{state_name := connected}}) -> handle_call(discard, Channel = #channel{conn_state = connected}) ->
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
{stop, {shutdown, discarded}, Packet, ok, Channel}; {stop, {shutdown, discarded}, ok, Packet, Channel};
handle_call(discard, Channel = #channel{state = #{state_name := disconnected}}) ->
handle_call(discard, Channel = #channel{conn_state = disconnected}) ->
{stop, {shutdown, discarded}, ok, Channel}; {stop, {shutdown, discarded}, ok, Channel};
%% Session Takeover %% Session Takeover
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
{ok, Session, Channel#channel{takeover = true}}; {reply, Session, Channel#channel{takeover = true}};
handle_call({takeover, 'end'}, Channel = #channel{session = Session, handle_call({takeover, 'end'}, Channel = #channel{session = Session,
pendings = Pendings}) -> pendings = Pendings}) ->
ok = emqx_session:takeover(Session), ok = emqx_session:takeover(Session),
AllPendings = lists:append(emqx_misc:drain_deliver(), Pendings), Delivers = emqx_misc:drain_deliver(),
AllPendings = lists:append(Delivers, Pendings),
{stop, {shutdown, takeovered}, AllPendings, Channel}; {stop, {shutdown, takeovered}, AllPendings, Channel};
handle_call(Req, Channel) -> handle_call(Req, Channel) ->
?LOG(error, "Unexpected call: ~p", [Req]), ?LOG(error, "Unexpected call: ~p", [Req]),
{ok, ignored, Channel}. {reply, ignored, Channel}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle Info %% Handle Info
@ -716,36 +734,32 @@ handle_info({register, Attrs, Stats}, #channel{clientinfo = #{clientid := Client
emqx_cm:set_chan_attrs(ClientId, Attrs), emqx_cm:set_chan_attrs(ClientId, Attrs),
emqx_cm:set_chan_stats(ClientId, Stats); emqx_cm:set_chan_stats(ClientId, Stats);
%%TODO: Fixme later handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) ->
%%handle_info(disconnected, Channel = #channel{connected = undefined}) ->
%% shutdown(closed, Channel);
handle_info(disconnected, Channel = #channel{state = #{state_name := disconnected}}) ->
{ok, Channel}; {ok, Channel};
handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := ExpiryInterval}, handle_info({sock_closed, Reason}, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo = #{zone := Zone}, clientinfo = ClientInfo = #{zone := Zone},
will_msg = WillMsg}) -> will_msg = WillMsg}) ->
emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo),
Channel1 = ensure_disconnected(Channel), ConnInfo1 = ConnInfo#{disconnected_at => erlang:system_time(second)},
Channel1 = Channel#channel{conninfo = ConnInfo1, conn_state = disconnected},
Channel2 = case timer:seconds(will_delay_interval(WillMsg)) of Channel2 = case timer:seconds(will_delay_interval(WillMsg)) of
0 -> 0 -> publish_will_msg(WillMsg),
publish_will_msg(WillMsg),
Channel1#channel{will_msg = undefined}; Channel1#channel{will_msg = undefined};
_ -> _ -> ensure_timer(will_timer, Channel1)
ensure_timer(will_timer, Channel1)
end, end,
case ExpiryInterval of case maps:get(expiry_interval, ConnInfo) of
?UINT_MAX -> ?UINT_MAX ->
{ok, Channel2}; {ok, {enter, disconnected}, Channel2};
Int when Int > 0 -> Int when Int > 0 ->
{ok, ensure_timer(expire_timer, Channel2)}; {ok, {enter, disconnected}, ensure_timer(expire_timer, Channel2)};
_Other -> _Other ->
shutdown(closed, Channel2) shutdown(Reason, Channel2)
end; end;
handle_info(Info, Channel) -> handle_info(Info, Channel) ->
?LOG(error, "Unexpected info: ~p~n", [Info]), ?LOG(error, "Unexpected info: ~p~n", [Info]),
error(unexpected_info),
{ok, Channel}. {ok, Channel}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -859,14 +873,11 @@ will_delay_interval(undefined) -> 0;
will_delay_interval(WillMsg) -> will_delay_interval(WillMsg) ->
emqx_message:get_header('Will-Delay-Interval', WillMsg, 0). emqx_message:get_header('Will-Delay-Interval', WillMsg, 0).
%% TODO: Implement later.
disconnect(_Reason, Channel) -> {ok, Channel}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Terminate %% Terminate
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
terminate(_, #channel{state = #{state_name := initialized}}) -> terminate(_, #channel{conn_state = idle}) ->
ok; ok;
terminate(normal, #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> terminate(normal, #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]); ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]);
@ -877,14 +888,6 @@ terminate(Reason, #channel{conninfo = ConnInfo, clientinfo = ClientInfo, will_ms
publish_will_msg(WillMsg), publish_will_msg(WillMsg),
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]). ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]).
-spec(received(pos_integer(), channel()) -> channel()).
received(Oct, Channel) ->
ensure_timer(stats_timer, maybe_gc_and_check_oom(Oct, Channel)).
-spec(sent(pos_integer(), channel()) -> channel()).
sent(Oct, Channel) ->
ensure_timer(stats_timer, maybe_gc_and_check_oom(Oct, Channel)).
%% TODO: Improve will msg:) %% TODO: Improve will msg:)
publish_will_msg(undefined) -> publish_will_msg(undefined) ->
ok; ok;
@ -1064,11 +1067,11 @@ check_pub_alias(_Packet, _Channel) -> ok.
%% Check Pub Caps %% Check Pub Caps
check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
retain = Retain retain = Retain},
} variable = #mqtt_packet_publish{topic_name = Topic}
}, },
#channel{clientinfo = #{zone := Zone}}) -> #channel{clientinfo = #{zone := Zone}}) ->
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}). emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}).
%% Check Sub %% Check Sub
check_subscribe(TopicFilter, SubOpts, Channel) -> check_subscribe(TopicFilter, SubOpts, Channel) ->
@ -1142,11 +1145,6 @@ init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
inbound => emqx_mqtt_caps:get_caps(Zone, max_topic_alias, 0)}; inbound => emqx_mqtt_caps:get_caps(Zone, max_topic_alias, 0)};
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined. init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
ensure_disconnected(Channel = #channel{state = FsmState}) ->
Channel#channel{state = FsmState#{state_name := disconnected,
disconnected_at => erlang:system_time(second)
}}.
ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) -> ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
ensure_keepalive_timer(Interval, Channel); ensure_keepalive_timer(Interval, Channel);
ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) -> ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) ->
@ -1206,3 +1204,6 @@ flag(false) -> 0.
shutdown(Reason, Channel) -> shutdown(Reason, Channel) ->
{stop, {shutdown, Reason}, Channel}. {stop, {shutdown, Reason}, Channel}.
shutdown(Reason, Packets, Channel) ->
{stop, {shutdown, Reason}, Packets, Channel}.

View File

@ -14,41 +14,43 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MQTT TCP/SSL Connection %% MQTT/TCP Connection
-module(emqx_connection). -module(emqx_connection).
-behaviour(gen_statem).
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
-logger_header("[Connection]"). -logger_header("[MQTT]").
-export([start_link/3]). %% API
-export([ start_link/3
, stop/1
]).
%% APIs
-export([ info/1 -export([ info/1
, stats/1 , stats/1
]). ]).
-export([call/2]). -export([call/2]).
%% state callbacks %% callback
-export([ idle/3 -export([init/4]).
, connected/3
, disconnected/3 %% Sys callbacks
-export([ system_continue/3
, system_terminate/4
, system_code_change/4
, system_get_state/1
]). ]).
%% gen_statem callbacks %% Internal callbacks
-export([ init/1 -export([wakeup_from_hib/2]).
, callback_mode/0
, code_change/4
, terminate/3
]).
-record(state, { -record(state, {
%% Parent
parent :: pid(),
%% TCP/TLS Transport %% TCP/TLS Transport
transport :: esockd:transport(), transport :: esockd:transport(),
%% TCP/TLS Socket %% TCP/TLS Socket
@ -57,10 +59,10 @@
peername :: emqx_types:peername(), peername :: emqx_types:peername(),
%% Sockname of the connection %% Sockname of the connection
sockname :: emqx_types:peername(), sockname :: emqx_types:peername(),
%% Sock state
sockstate :: emqx_types:sockstate(),
%% The {active, N} option %% The {active, N} option
active_n :: pos_integer(), active_n :: pos_integer(),
%% The active state
active_state :: running | blocked,
%% Publish Limit %% Publish Limit
pub_limit :: maybe(esockd_rate_limit:bucket()), pub_limit :: maybe(esockd_rate_limit:bucket()),
%% Rate Limit %% Rate Limit
@ -72,87 +74,107 @@
%% Serialize function %% Serialize function
serialize :: emqx_frame:serialize_fun(), serialize :: emqx_frame:serialize_fun(),
%% Channel State %% Channel State
chan_state :: emqx_channel:channel() channel :: emqx_channel:channel(),
%% Idle timer
idle_timer :: reference()
}). }).
-type(state() :: #state{}). -type(state() :: #state{}).
-define(ACTIVE_N, 100). -define(ACTIVE_N, 100).
-define(HANDLE(T, C, D), handle((T), (C), (D))). -define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n,
-define(INFO_KEYS, [socktype, peername, sockname, active_n, active_state,
pub_limit, rate_limit]). pub_limit, rate_limit]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
%% @doc Start the connection.
-spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist()) -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
-> {ok, pid()}). -> {ok, pid()}).
start_link(Transport, Socket, Options) -> start_link(Transport, Socket, Options) ->
{ok, proc_lib:spawn_link(?MODULE, init, [{Transport, Socket, Options}])}. CPid = proc_lib:spawn_link(?MODULE, init, [self(), Transport, Socket, Options]),
{ok, CPid}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Get infos of the connection. %% @doc Get infos of the connection/channel.
-spec(info(pid()|state()) -> emqx_types:infos()). -spec(info(pid()|state()) -> emqx_types:infos()).
info(CPid) when is_pid(CPid) -> info(CPid) when is_pid(CPid) ->
call(CPid, info); call(CPid, info);
info(Conn = #state{chan_state = ChanState}) -> info(State = #state{channel = Channel}) ->
ChanInfo = emqx_channel:info(ChanState), ChanInfo = emqx_channel:info(Channel),
SockInfo = maps:from_list(info(?INFO_KEYS, Conn)), SockInfo = maps:from_list(info(?INFO_KEYS, State)),
maps:merge(ChanInfo, #{sockinfo => SockInfo}). maps:merge(ChanInfo, #{sockinfo => SockInfo}).
info(Keys, Conn) when is_list(Keys) -> info(Keys, State) when is_list(Keys) ->
[{Key, info(Key, Conn)} || Key <- Keys]; [{Key, info(Key, State)} || Key <- Keys];
info(socktype, #state{transport = Transport, socket = Socket}) -> info(socktype, #state{transport = Transport, socket = Socket}) ->
Transport:type(Socket); Transport:type(Socket);
info(peername, #state{peername = Peername}) -> info(peername, #state{peername = Peername}) ->
Peername; Peername;
info(sockname, #state{sockname = Sockname}) -> info(sockname, #state{sockname = Sockname}) ->
Sockname; Sockname;
info(sockstate, #state{sockstate = SockSt}) ->
SockSt;
info(active_n, #state{active_n = ActiveN}) -> info(active_n, #state{active_n = ActiveN}) ->
ActiveN; ActiveN;
info(active_state, #state{active_state = ActiveSt}) ->
ActiveSt;
info(pub_limit, #state{pub_limit = PubLimit}) -> info(pub_limit, #state{pub_limit = PubLimit}) ->
limit_info(PubLimit); limit_info(PubLimit);
info(rate_limit, #state{rate_limit = RateLimit}) -> info(rate_limit, #state{rate_limit = RateLimit}) ->
limit_info(RateLimit); limit_info(RateLimit);
info(chan_state, #state{chan_state = ChanState}) -> info(channel, #state{channel = Channel}) ->
emqx_channel:info(ChanState). emqx_channel:info(Channel).
limit_info(Limit) -> limit_info(Limit) ->
emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit). emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit).
%% @doc Get stats of the channel. %% @doc Get stats of the connection/channel.
-spec(stats(pid()|state()) -> emqx_types:stats()). -spec(stats(pid()|state()) -> emqx_types:stats()).
stats(CPid) when is_pid(CPid) -> stats(CPid) when is_pid(CPid) ->
call(CPid, stats); call(CPid, stats);
stats(#state{transport = Transport, stats(#state{transport = Transport,
socket = Socket, socket = Socket,
chan_state = ChanState}) -> channel = Channel}) ->
SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of
{ok, Ss} -> Ss; {ok, Ss} -> Ss;
{error, _} -> [] {error, _} -> []
end, end,
ConnStats = emqx_pd:get_counters(?CONN_STATS), ConnStats = emqx_pd:get_counters(?CONN_STATS),
ChanStats = emqx_channel:stats(ChanState), ChanStats = emqx_channel:stats(Channel),
ProcStats = emqx_misc:proc_stats(), ProcStats = emqx_misc:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]). lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
%% kick|discard|takeover call(Pid, Req) ->
-spec(call(pid(), Req :: term()) -> Reply :: term()). gen_server:call(Pid, Req, infinity).
call(CPid, Req) -> gen_statem:call(CPid, Req).
stop(Pid) ->
gen_server:stop(Pid).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_statem callbacks %% callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init({Transport, RawSocket, Options}) -> init(Parent, Transport, RawSocket, Options) ->
{ok, Socket} = Transport:wait(RawSocket), case Transport:wait(RawSocket) of
{ok, Socket} ->
do_init(Parent, Transport, Socket, Options);
{error, Reason} when Reason =:= enotconn;
Reason =:= einval;
Reason =:= closed ->
Transport:fast_close(RawSocket),
exit(normal);
{error, timeout} ->
Transport:fast_close(RawSocket),
exit({shutdown, ssl_upgrade_timeout});
{error, Reason} ->
Transport:fast_close(RawSocket),
exit(Reason)
end.
do_init(Parent, Transport, Socket, Options) ->
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
ConnInfo = #{socktype => Transport:type(Socket), ConnInfo = #{socktype => Transport:type(Socket),
peername => Peername, peername => Peername,
@ -160,7 +182,6 @@ init({Transport, RawSocket, Options}) ->
peercert => Peercert, peercert => Peercert,
conn_mod => ?MODULE conn_mod => ?MODULE
}, },
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
Zone = proplists:get_value(zone, Options), Zone = proplists:get_value(zone, Options),
ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)), PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)),
@ -168,328 +189,315 @@ init({Transport, RawSocket, Options}) ->
FrameOpts = emqx_zone:frame_options(Zone), FrameOpts = emqx_zone:frame_options(Zone),
ParseState = emqx_frame:initial_parse_state(FrameOpts), ParseState = emqx_frame:initial_parse_state(FrameOpts),
Serialize = emqx_frame:serialize_fun(), Serialize = emqx_frame:serialize_fun(),
ChanState = emqx_channel:init(ConnInfo, Options), Channel = emqx_channel:init(ConnInfo, Options),
State = #state{transport = Transport, IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
IdleTimer = emqx_misc:start_timer(IdleTimout, idle_timeout),
HibAfterTimeout = emqx_zone:get_env(Zone, hibernate_after, IdleTimout*2),
State = #state{parent = Parent,
transport = Transport,
socket = Socket, socket = Socket,
peername = Peername, peername = Peername,
sockname = Sockname, sockname = Sockname,
sockstate = idle,
active_n = ActiveN, active_n = ActiveN,
active_state = running,
pub_limit = PubLimit, pub_limit = PubLimit,
rate_limit = RateLimit, rate_limit = RateLimit,
parse_state = ParseState, parse_state = ParseState,
serialize = Serialize, serialize = Serialize,
chan_state = ChanState channel = Channel,
idle_timer = IdleTimer
}, },
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), case activate_socket(State) of
gen_statem:enter_loop(?MODULE, [{hibernate_after, 2 * IdleTimout}], {ok, NState} ->
idle, State, self(), [IdleTimout]). recvloop(NState, #{hibernate_after => HibAfterTimeout});
{error, Reason} when Reason =:= einval;
Reason =:= enotconn;
Reason =:= closed ->
Transport:fast_close(Socket),
exit(normal);
{error, Reason} ->
Transport:fast_close(Socket),
erlang:exit({shutdown, Reason})
end.
-compile({inline, [init_limiter/1]}). -compile({inline, [init_limiter/1]}).
init_limiter(undefined) -> undefined; init_limiter(undefined) -> undefined;
init_limiter({Rate, Burst}) -> init_limiter({Rate, Burst}) ->
esockd_rate_limit:new(Rate, Burst). esockd_rate_limit:new(Rate, Burst).
-compile({inline, [callback_mode/0]}). %%--------------------------------------------------------------------
callback_mode() -> %% Recv Loop
[state_functions, state_enter].
recvloop(State = #state{parent = Parent},
Options = #{hibernate_after := HibAfterTimeout}) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent,
?MODULE, [], {State, Options});
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
process_msg([Msg], State, Options)
after
HibAfterTimeout ->
hibernate(State, Options)
end.
hibernate(State, Options) ->
proc_lib:hibernate(?MODULE, wakeup_from_hib, [State, Options]).
wakeup_from_hib(State, Options) ->
%% Maybe do something later here.
recvloop(State, Options).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Idle State %% Process next Msg
idle(enter, _, State) -> process_msg([], State, Options) ->
case activate_socket(State) of recvloop(State, Options);
ok -> keep_state_and_data;
{error, Reason} -> process_msg([Msg|More], State, Options) ->
shutdown(Reason, State) case catch handle_msg(Msg, State) of
ok ->
process_msg(More, State, Options);
{ok, NState} ->
process_msg(More, NState, Options);
{ok, NextMsgs, NState} ->
process_msg(append_msg(NextMsgs, More), NState, Options);
{stop, Reason} ->
terminate(Reason, State);
{stop, Reason, NState} ->
terminate(Reason, NState);
{'EXIT', Reason} ->
terminate(Reason, State)
end.
%%--------------------------------------------------------------------
%% Handle a Msg
handle_msg({'$gen_call', From, Req}, State) ->
case handle_call(From, Req, State) of
{reply, Reply, NState} ->
gen_server:reply(From, Reply),
{ok, NState};
{stop, Reason, Reply, NState} ->
gen_server:reply(From, Reply),
stop(Reason, NState)
end; end;
idle(timeout, _Timeout, State) -> handle_msg({Inet, _Sock, Data}, State = #state{channel = Channel})
shutdown(idle_timeout, State);
idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end,
Serialize = emqx_frame:serialize_fun(ConnPkt),
NState = State#state{serialize = Serialize},
handle_incoming(Packet, SuccFun, NState);
idle(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end,
handle_incoming(Packet, SuccFun, State);
idle(cast, {incoming, FrameError = {frame_error, _Reason}}, State) ->
handle_incoming(FrameError, State);
idle(EventType, Content, State) ->
?HANDLE(EventType, Content, State).
%%--------------------------------------------------------------------
%% Connected State
connected(enter, _PrevSt, State) ->
ok = register_self(State),
keep_state_and_data;
connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
handle_incoming(Packet, fun keep_state/1, State);
connected(cast, {incoming, FrameError = {frame_error, _Reason}}, State) ->
handle_incoming(FrameError, State);
connected(info, Deliver = {deliver, _Topic, _Msg}, State) ->
handle_deliver(emqx_misc:drain_deliver([Deliver]), State);
connected(EventType, Content, State) ->
?HANDLE(EventType, Content, State).
%%--------------------------------------------------------------------
%% Disconnected State
disconnected(enter, _, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_info(disconnected, ChanState) of
{ok, NChanState} ->
ok = register_self(State#state{chan_state = NChanState}),
keep_state(State#state{chan_state = NChanState});
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState})
end;
disconnected(info, Deliver = {deliver, _Topic, _Msg}, State) ->
handle_deliver([Deliver], State);
disconnected(EventType, Content, State) ->
?HANDLE(EventType, Content, State).
%%--------------------------------------------------------------------
%% Handle call
handle({call, From}, info, State) ->
reply(From, info(State), State);
handle({call, From}, stats, State) ->
reply(From, stats(State), State);
handle({call, From}, state, State) ->
reply(From, State, State);
handle({call, From}, Req, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_call(Req, ChanState) of
{ok, Reply, NChanState} ->
reply(From, Reply, State#state{chan_state = NChanState});
{stop, Reason, Reply, NChanState} ->
ok = gen_statem:reply(From, Reply),
stop(Reason, State#state{chan_state = NChanState});
{stop, Reason, Packet, Reply, NChanState} ->
handle_outgoing(Packet, State#state{chan_state = NChanState}),
ok = gen_statem:reply(From, Reply),
stop(Reason, State#state{chan_state = NChanState})
end;
%%--------------------------------------------------------------------
%% Handle cast
handle(cast, Msg, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_info(Msg, ChanState) of
ok -> {ok, State};
{ok, NChanState} ->
keep_state(State#state{chan_state = NChanState});
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState})
end;
%%--------------------------------------------------------------------
%% Handle info
%% Handle incoming data
handle(info, {Inet, _Sock, Data}, State = #state{chan_state = ChanState})
when Inet == tcp; Inet == ssl -> when Inet == tcp; Inet == ssl ->
?LOG(debug, "RECV ~p", [Data]), ?LOG(debug, "RECV ~p", [Data]),
Oct = iolist_size(Data), Oct = iolist_size(Data),
emqx_pd:update_counter(incoming_bytes, Oct), emqx_pd:update_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct), ok = emqx_metrics:inc('bytes.received', Oct),
NChanState = emqx_channel:received(Oct, ChanState), {ok, NChannel} = emqx_channel:handle_in(Oct, Channel),
NState = State#state{chan_state = NChanState}, process_incoming(Data, State#state{channel = NChannel});
process_incoming(Data, NState);
handle(info, {Error, _Sock, Reason}, State) handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
State = #state{idle_timer = IdleTimer}) ->
ok = emqx_misc:cancel_timer(IdleTimer),
Serialize = emqx_frame:serialize_fun(ConnPkt),
NState = State#state{serialize = Serialize,
idle_timer = undefined
},
handle_incoming(Packet, NState);
handle_msg({incoming, Packet}, State) ->
handle_incoming(Packet, State);
handle_msg({Error, _Sock, Reason}, State)
when Error == tcp_error; Error == ssl_error -> when Error == tcp_error; Error == ssl_error ->
shutdown(Reason, State); handle_info({sock_error, Reason}, State);
handle(info, {Closed, _Sock}, State) handle_msg({Closed, _Sock}, State)
when Closed == tcp_closed; Closed == ssl_closed -> when Closed == tcp_closed; Closed == ssl_closed ->
{next_state, disconnected, State}; handle_info(sock_closed, State);
handle(info, {Passive, _Sock}, State) handle_msg({Passive, _Sock}, State)
when Passive == tcp_passive; Passive == ssl_passive -> when Passive == tcp_passive; Passive == ssl_passive ->
%% Rate limit here:) %% Rate limit and activate socket here.
NState = ensure_rate_limit(State), NState = ensure_rate_limit(State),
case activate_socket(NState) of case activate_socket(NState) of
ok -> keep_state(NState); {ok, NState} -> {ok, NState};
{error, Reason} -> {error, Reason} ->
shutdown(Reason, NState) {ok, {sock_error, Reason}, NState}
end; end;
handle(info, activate_socket, State) -> %% Rate limit timer expired.
%% Rate limit timer expired. handle_msg(activate_socket, State) ->
NState = State#state{active_state = running, NState = State#state{sockstate = idle,
limit_timer = undefined limit_timer = undefined
}, },
case activate_socket(NState) of case activate_socket(NState) of
ok -> keep_state(NState); {ok, NState} -> {ok, NState};
{error, Reason} -> {error, Reason} ->
shutdown(Reason, NState) {ok, {sock_error, Reason}, State}
end; end;
handle(info, {inet_reply, _Sock, ok}, _State) -> handle_msg(Deliver = {deliver, _Topic, _Msg},
%% something sent State = #state{channel = Channel}) ->
keep_state_and_data; Delivers = emqx_misc:drain_deliver([Deliver]),
Result = emqx_channel:handle_out(Delivers, Channel),
handle_return(Result, State);
handle(info, {inet_reply, _Sock, {error, Reason}}, State) -> handle_msg({outgoing, Packets}, State) ->
shutdown(Reason, State); {ok, handle_outgoing(Packets, State)};
handle(info, {timeout, TRef, keepalive}, %% something sent
State = #state{transport = Transport, socket = Socket}) -> handle_msg({inet_reply, _Sock, ok}, _State) ->
ok;
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
handle_info({sock_error, Reason}, State);
handle_msg({timeout, TRef, TMsg}, State) ->
handle_timeout(TRef, TMsg, State);
handle_msg(Shutdown = {shutdown, _Reason}, State) ->
stop(Shutdown, State);
handle_msg(Msg, State) -> handle_info(Msg, State).
%%--------------------------------------------------------------------
%% Terminate
terminate(Reason, #state{transport = Transport,
socket = Socket,
sockstate = SockSt,
channel = Channel}) ->
?LOG(debug, "Terminated for ~p", [Reason]),
SockSt =:= closed orelse Transport:fast_close(Socket),
emqx_channel:terminate(Reason, Channel),
exit(Reason).
%%--------------------------------------------------------------------
%% Sys callbacks
system_continue(_Parent, _Deb, {State, Options}) ->
recvloop(State, Options).
system_terminate(Reason, _Parent, _Deb, {State, _}) ->
terminate(Reason, State).
system_code_change(Misc, _, _, _) ->
{ok, Misc}.
system_get_state({State, _Options}) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Handle call
handle_call(_From, info, State) ->
{reply, info(State), State};
handle_call(_From, stats, State) ->
{reply, stats(State), State};
handle_call(_From, Req, State = #state{channel = Channel}) ->
case emqx_channel:handle_call(Req, Channel) of
{reply, Reply, NChannel} ->
{reply, Reply, State#state{channel = NChannel}};
{stop, Reason, Reply, NChannel} ->
{stop, Reason, Reply, State#state{channel = NChannel}};
{stop, Reason, Reply, OutPacket, NChannel} ->
NState = State#state{channel = NChannel},
NState1 = handle_outgoing(OutPacket, NState),
{stop, Reason, Reply, NState1}
end.
%%--------------------------------------------------------------------
%% Handle timeout
handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
stop(idle_timeout, State);
handle_timeout(TRef, emit_stats, State) ->
handle_timeout(TRef, {emit_stats, stats(State)}, State);
handle_timeout(TRef, keepalive, State = #state{transport = Transport,
socket = Socket}) ->
case Transport:getstat(Socket, [recv_oct]) of case Transport:getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, [{recv_oct, RecvOct}]} ->
handle_timeout(TRef, {keepalive, RecvOct}, State); handle_timeout(TRef, {keepalive, RecvOct}, State);
{error, Reason} -> {error, Reason} ->
shutdown(Reason, State) handle_info({sockerr, Reason}, State)
end; end;
handle(info, {timeout, TRef, emit_stats}, State) -> handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
handle_timeout(TRef, {emit_stats, stats(State)}, State); handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
handle(info, {timeout, TRef, Msg}, State) ->
handle_timeout(TRef, Msg, State);
handle(info, {shutdown, Reason}, State) ->
shutdown(Reason, State);
handle(info, Info, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_info(Info, ChanState) of
{ok, NChanState} ->
keep_state(State#state{chan_state = NChanState});
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState})
end.
code_change(_Vsn, State, Data, _Extra) ->
{ok, State, Data}.
terminate(Reason, _StateName, #state{transport = Transport,
socket = Socket,
chan_state = ChanState
}) ->
?LOG(debug, "Terminated for ~p", [Reason]),
ok = Transport:fast_close(Socket),
emqx_channel:terminate(Reason, ChanState).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Process/Parse incoming data.
%%--------------------------------------------------------------------
register_self(State = #state{active_n = ActiveN,
active_state = ActiveSt,
chan_state = ChanState
}) ->
ChanAttrs = emqx_channel:attrs(ChanState),
SockAttrs = #{active_n => ActiveN,
active_state => ActiveSt
},
Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
emqx_channel:handle_info({register, Attrs, stats(State)}, ChanState).
%%--------------------------------------------------------------------
%% Process incoming data
-compile({inline, [process_incoming/2]}). -compile({inline, [process_incoming/2]}).
process_incoming(Data, State) -> process_incoming(Data, State) ->
process_incoming(Data, [], State). {Packets, NState} = parse_incoming(Data, State),
{ok, next_incoming_msgs(Packets), NState}.
process_incoming(<<>>, Packets, State) -> -compile({inline, [parse_incoming/2]}).
keep_state(State, next_incoming_events(Packets)); parse_incoming(Data, State) ->
parse_incoming(Data, [], State).
process_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> parse_incoming(<<>>, Packets, State) ->
{Packets, State};
parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
try emqx_frame:parse(Data, ParseState) of try emqx_frame:parse(Data, ParseState) of
{more, NParseState} -> {more, NParseState} ->
NState = State#state{parse_state = NParseState}, {Packets, State#state{parse_state = NParseState}};
keep_state(NState, next_incoming_events(Packets));
{ok, Packet, Rest, NParseState} -> {ok, Packet, Rest, NParseState} ->
NState = State#state{parse_state = NParseState}, NState = State#state{parse_state = NParseState},
process_incoming(Rest, [Packet|Packets], NState) parse_incoming(Rest, [Packet|Packets], NState)
catch catch
error:Reason:Stk -> error:Reason:Stk ->
?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nFrame data:~p", ?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nFrame data:~p",
[Reason, Stk, Data]), [Reason, Stk, Data]),
keep_state(State, next_incoming_events(Packets++[{frame_error, Reason}])) {[{frame_error, Reason}|Packets], State}
end. end.
-compile({inline, [next_incoming_events/1]}). next_incoming_msgs([Packet]) ->
next_incoming_events([]) -> []; {incoming, Packet};
next_incoming_events(Packets) -> next_incoming_msgs(Packets) ->
[next_event(cast, {incoming, Packet}) || Packet <- Packets]. [{incoming, Packet} || Packet <- lists:reverse(Packets)].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle incoming packet %% Handle incoming packet
handle_incoming(Packet = ?PACKET(Type), SuccFun, State = #state{chan_state = ChanState}) -> handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) ->
_ = inc_incoming_stats(Type), _ = inc_incoming_stats(Type),
_ = emqx_metrics:inc_recv(Packet), ok = emqx_metrics:inc_recv(Packet),
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
case emqx_channel:handle_in(Packet, ChanState) of handle_return(emqx_channel:handle_in(Packet, Channel), State);
{ok, NChanState} ->
SuccFun(State#state{chan_state= NChanState});
{ok, OutPackets, NChanState} ->
NState = State#state{chan_state = NChanState},
handle_outgoing(OutPackets, SuccFun, NState);
{close, Reason, NChanState} ->
close(Reason, State#state{chan_state = NChanState});
{close, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState));
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState});
{stop, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState))
end.
handle_incoming(FrameError = {frame_error, _Reason}, State = #state{chan_state = ChanState}) -> handle_incoming(FrameError, State = #state{channel = Channel}) ->
case emqx_channel:handle_in(FrameError, ChanState) of handle_return(emqx_channel:handle_in(FrameError, Channel), State).
{close, Reason, NChanState} ->
close(Reason, State#state{chan_state = NChanState});
{close, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState));
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState});
{stop, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState))
end.
%%------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle deliver %% Handle channel return
handle_deliver(Delivers, State = #state{chan_state = ChanState}) -> handle_return(ok, State) ->
case emqx_channel:handle_out({deliver, Delivers}, ChanState) of {ok, State};
{ok, NChanState} -> handle_return({ok, NChannel}, State) ->
keep_state(State#state{chan_state = NChanState}); {ok, State#state{channel = NChannel}};
{ok, Packets, NChanState} -> handle_return({ok, Replies, NChannel}, State) ->
handle_outgoing(Packets, fun keep_state/1, State#state{chan_state = NChanState}) {ok, next_msgs(Replies), State#state{channel = NChannel}};
end. handle_return({stop, Reason, NChannel}, State) ->
stop(Reason, State#state{channel = NChannel});
handle_return({stop, Reason, OutPacket, NChannel}, State) ->
NState = State#state{channel = NChannel},
NState1 = handle_outgoing(OutPacket, NState),
stop(Reason, NState1).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle outgoing packets %% Handle outgoing packets
handle_outgoing(Packets, State) when is_list(Packets) ->
send(lists:map(serialize_and_inc_stats_fun(State), Packets), State);
handle_outgoing(Packet, State) -> handle_outgoing(Packet, State) ->
handle_outgoing(Packet, fun (_) -> ok end, State). send((serialize_and_inc_stats_fun(State))(Packet), State).
handle_outgoing(Packets, SuccFun, State) when is_list(Packets) ->
send(lists:map(serialize_and_inc_stats_fun(State), Packets), SuccFun, State);
handle_outgoing(Packet, SuccFun, State) ->
send((serialize_and_inc_stats_fun(State))(Packet), SuccFun, State).
serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet = ?PACKET(Type)) -> fun(Packet = ?PACKET(Type)) ->
@ -507,39 +515,73 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Send data %% Send data
send(IoData, SuccFun, State = #state{transport = Transport, send(IoData, State = #state{transport = Transport,
socket = Socket, socket = Socket,
chan_state = ChanState}) -> channel = Channel}) ->
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct), ok = emqx_metrics:inc('bytes.sent', Oct),
case Transport:async_send(Socket, IoData) of case Transport:async_send(Socket, IoData) of
ok -> NChanState = emqx_channel:sent(Oct, ChanState), ok ->
SuccFun(State#state{chan_state = NChanState}); {ok, NChannel} = emqx_channel:handle_out(Oct, Channel),
{error, Reason} -> State#state{channel = NChannel};
shutdown(Reason, State) Error = {error, _Reason} ->
%% Simulate an inet_reply to postpone handling the error
self() ! {inet_reply, Socket, Error}, State
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle timeout %% Handle Info
handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> handle_info({enter, _}, State = #state{active_n = ActiveN,
case emqx_channel:handle_timeout(TRef, Msg, ChanState) of sockstate = SockSt,
{ok, NChanState} -> channel = Channel}) ->
keep_state(State#state{chan_state = NChanState}); ChanAttrs = emqx_channel:attrs(Channel),
{ok, Packets, NChanState} -> SockAttrs = #{active_n => ActiveN,
handle_outgoing(Packets, fun keep_state/1, State#state{chan_state = NChanState}); sockstate => SockSt
{close, Reason, NChanState} -> },
close(Reason, State#state{chan_state = NChanState}); Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
{close, Reason, OutPackets, NChanState} -> handle_info({register, Attrs, stats(State)}, State);
NState = State#state{chan_state= NChanState},
close(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)); handle_info({sockerr, _Reason}, #state{sockstate = closed}) -> ok;
{stop, Reason, NChanState} -> handle_info({sockerr, Reason}, State) ->
stop(Reason, State#state{chan_state = NChanState}); ?LOG(debug, "Socket error: ~p", [Reason]),
{stop, Reason, OutPackets, NChanState} -> handle_info({sock_closed, Reason}, close_socket(State));
NState = State#state{chan_state= NChanState},
stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)) handle_info(sock_closed, #state{sockstate = closed}) -> ok;
handle_info(sock_closed, State) ->
?LOG(debug, "Socket closed"),
handle_info({sock_closed, closed}, close_socket(State));
handle_info({close, Reason}, State) ->
?LOG(debug, "Force close due to : ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State));
handle_info(Info, State = #state{channel = Channel}) ->
handle_return(emqx_channel:handle_info(Info, Channel), State).
%%--------------------------------------------------------------------
%% Activate Socket
-compile({inline, [activate_socket/1]}).
activate_socket(State = #state{sockstate = closed}) ->
{ok, State};
activate_socket(State = #state{sockstate = blocked}) ->
{ok, State};
activate_socket(State = #state{transport = Transport,
socket = Socket,
active_n = N}) ->
case Transport:setopts(Socket, [{active, N}]) of
ok -> {ok, State#state{sockstate = running}};
Error -> Error
end. end.
%%--------------------------------------------------------------------
%% Close Socket
close_socket(State = #state{transport = Transport, socket = Socket}) ->
ok = Transport:fast_close(Socket),
State#state{sockstate = closed}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Ensure rate limit %% Ensure rate limit
@ -561,22 +603,10 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) ->
{Pause, Rl1} -> {Pause, Rl1} ->
?LOG(debug, "Pause ~pms due to rate limit", [Pause]), ?LOG(debug, "Pause ~pms due to rate limit", [Pause]),
TRef = erlang:send_after(Pause, self(), activate_socket), TRef = erlang:send_after(Pause, self(), activate_socket),
NState = State#state{active_state = blocked, NState = State#state{sockstate = blocked, limit_timer = TRef},
limit_timer = TRef
},
setelement(Pos, NState, Rl1) setelement(Pos, NState, Rl1)
end. end.
%%--------------------------------------------------------------------
%% Activate Socket
-compile({inline, [activate_socket/1]}).
activate_socket(#state{active_state = blocked}) -> ok;
activate_socket(#state{transport = Transport,
socket = Socket,
active_n = N}) ->
Transport:setopts(Socket, [{active, N}]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Inc incoming/outgoing stats %% Inc incoming/outgoing stats
@ -593,41 +623,25 @@ inc_incoming_stats(Type) when is_integer(Type) ->
-compile({inline, [inc_outgoing_stats/1]}). -compile({inline, [inc_outgoing_stats/1]}).
inc_outgoing_stats(Type) -> inc_outgoing_stats(Type) ->
emqx_pd:update_counter(send_pkt, 1), emqx_pd:update_counter(send_pkt, 1),
(Type == ?PUBLISH) (Type == ?PUBLISH) andalso emqx_pd:update_counter(send_msg, 1).
andalso emqx_pd:update_counter(send_msg, 1).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
-compile({inline, -compile({inline, [append_msg/2]}).
[ reply/3 append_msg(Msgs, Q) when is_list(Msgs) ->
, keep_state/1 lists:append(Msgs, Q);
, keep_state/2 append_msg(Msg, Q) -> [Msg|Q].
, next_event/2
, shutdown/2
, stop/2
]}).
reply(From, Reply, State) -> -compile({inline, [next_msgs/1]}).
{keep_state, State, [{reply, From, Reply}]}. next_msgs(Packet) when is_record(Packet, mqtt_packet) ->
{outgoing, Packet};
keep_state(State) -> next_msgs(Action) when is_tuple(Action) ->
{keep_state, State}. Action;
next_msgs(Actions) when is_list(Actions) ->
keep_state(State, Events) -> Actions.
{keep_state, State, Events}.
next_event(Type, Content) ->
{next_event, Type, Content}.
close(Reason, State = #state{transport = Transport, socket = Socket}) ->
?LOG(warning, "Closed for ~p", [Reason]),
ok = Transport:fast_close(Socket),
{next_state, disconnected, State}.
shutdown(Reason, State) ->
stop({shutdown, Reason}, State).
-compile({inline, [stop/2]}).
stop(Reason, State) -> stop(Reason, State) ->
{stop, Reason, State}. {stop, Reason, State}.

View File

@ -46,7 +46,7 @@
-define(UNLIMITED, 0). -define(UNLIMITED, 0).
-define(PUBCAP_KEYS, [max_topic_alias, -define(PUBCAP_KEYS, [max_topic_levels,
max_qos_allowed, max_qos_allowed,
retain_available retain_available
]). ]).
@ -73,8 +73,16 @@
retain => boolean()}) retain => boolean()})
-> ok_or_error(emqx_types:reason_code())). -> ok_or_error(emqx_types:reason_code())).
check_pub(Zone, Flags) when is_map(Flags) -> check_pub(Zone, Flags) when is_map(Flags) ->
do_check_pub(Flags, get_caps(Zone, publish)). do_check_pub(case maps:take(topic, Flags) of
{Topic, Flags1} ->
Flags1#{topic_levels => emqx_topic:levels(Topic)};
error ->
Flags
end, get_caps(Zone, publish)).
do_check_pub(#{topic_levels := Levels}, #{max_topic_levels := Limit})
when Limit > 0, Levels > Limit ->
{error, ?RC_TOPIC_NAME_INVALID};
do_check_pub(#{qos := QoS}, #{max_qos_allowed := MaxQoS}) do_check_pub(#{qos := QoS}, #{max_qos_allowed := MaxQoS})
when QoS > MaxQoS -> when QoS > MaxQoS ->
{error, ?RC_QOS_NOT_SUPPORTED}; {error, ?RC_QOS_NOT_SUPPORTED};

View File

@ -539,12 +539,12 @@ enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
Session = #session{upgrade_qos= false}) -> Session = #session{upgrade_qos= false}) ->
enrich_subopt(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session); enrich_subopt(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
enrich_subopt([{rap, 1}|Opts], Msg, Session) -> enrich_subopt([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session) ->
enrich_subopt(Opts, Msg, Session); enrich_subopt(Opts, emqx_message:set_flag(retain, true, Msg), Session);
enrich_subopt([{rap, 0}|Opts], Msg = #message{headers = #{retained := true}}, Session) ->
enrich_subopt(Opts, Msg, Session);
enrich_subopt([{rap, 0}|Opts], Msg, Session) -> enrich_subopt([{rap, 0}|Opts], Msg, Session) ->
enrich_subopt(Opts, emqx_message:set_flag(retain, false, Msg), Session); enrich_subopt(Opts, emqx_message:set_flag(retain, false, Msg), Session);
enrich_subopt([{rap, 1}|Opts], Msg, Session) ->
enrich_subopt(Opts, Msg, Session);
enrich_subopt([{subid, SubId}|Opts], Msg, Session) -> enrich_subopt([{subid, SubId}|Opts], Msg, Session) ->
Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg), Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg),
enrich_subopt(Opts, Msg1, Session). enrich_subopt(Opts, Msg1, Session).

View File

@ -31,7 +31,9 @@
, subid/0 , subid/0
]). ]).
-export_type([ conninfo/0 -export_type([ socktype/0
, sockstate/0
, conninfo/0
, clientinfo/0 , clientinfo/0
, clientid/0 , clientid/0
, username/0 , username/0
@ -97,6 +99,7 @@
-type(subid() :: binary() | atom()). -type(subid() :: binary() | atom()).
-type(socktype() :: tcp | udp | ssl | proxy | atom()). -type(socktype() :: tcp | udp | ssl | proxy | atom()).
-type(sockstate() :: idle | running | blocked | closed).
-type(conninfo() :: #{socktype := socktype(), -type(conninfo() :: #{socktype := socktype(),
sockname := peername(), sockname := peername(),
peername := peername(), peername := peername(),

View File

@ -14,7 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MQTT WebSocket Connection %% MQTT/WS Connection
-module(emqx_ws_connection). -module(emqx_ws_connection).
-include("emqx.hrl"). -include("emqx.hrl").
@ -22,8 +22,9 @@
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
-logger_header("[WsConnection]"). -logger_header("[MQTT/WS]").
%% API
-export([ info/1 -export([ info/1
, stats/1 , stats/1
]). ]).
@ -35,6 +36,7 @@
, websocket_init/1 , websocket_init/1
, websocket_handle/2 , websocket_handle/2
, websocket_info/2 , websocket_info/2
, websocket_close/2
, terminate/3 , terminate/3
]). ]).
@ -43,14 +45,14 @@
peername :: emqx_types:peername(), peername :: emqx_types:peername(),
%% Sockname of the ws connection %% Sockname of the ws connection
sockname :: emqx_types:peername(), sockname :: emqx_types:peername(),
%% Conn state %% Sock state
conn_state :: idle | connected | disconnected, sockstate :: emqx_types:sockstate(),
%% Parser State %% Parser State
parse_state :: emqx_frame:parse_state(), parse_state :: emqx_frame:parse_state(),
%% Serialize function %% Serialize function
serialize :: emqx_frame:serialize_fun(), serialize :: emqx_frame:serialize_fun(),
%% Channel State %% Channel
chan_state :: emqx_channel:channel(), channel :: emqx_channel:channel(),
%% Out Pending Packets %% Out Pending Packets
pendings :: list(emqx_types:packet()), pendings :: list(emqx_types:packet()),
%% The stop reason %% The stop reason
@ -59,7 +61,7 @@
-type(state() :: #state{}). -type(state() :: #state{}).
-define(INFO_KEYS, [socktype, peername, sockname, active_state]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
@ -70,8 +72,8 @@
-spec(info(pid()|state()) -> emqx_types:infos()). -spec(info(pid()|state()) -> emqx_types:infos()).
info(WsPid) when is_pid(WsPid) -> info(WsPid) when is_pid(WsPid) ->
call(WsPid, info); call(WsPid, info);
info(WsConn = #state{chan_state = ChanState}) -> info(WsConn = #state{channel = Channel}) ->
ChanInfo = emqx_channel:info(ChanState), ChanInfo = emqx_channel:info(Channel),
SockInfo = maps:from_list(info(?INFO_KEYS, WsConn)), SockInfo = maps:from_list(info(?INFO_KEYS, WsConn)),
maps:merge(ChanInfo, #{sockinfo => SockInfo}). maps:merge(ChanInfo, #{sockinfo => SockInfo}).
@ -83,18 +85,18 @@ info(peername, #state{peername = Peername}) ->
Peername; Peername;
info(sockname, #state{sockname = Sockname}) -> info(sockname, #state{sockname = Sockname}) ->
Sockname; Sockname;
info(active_state, _State) -> info(sockstate, #state{sockstate = SockSt}) ->
running; SockSt;
info(chan_state, #state{chan_state = ChanState}) -> info(channel, #state{channel = Channel}) ->
emqx_channel:info(ChanState). emqx_channel:info(Channel).
-spec(stats(pid()|state()) -> emqx_types:stats()). -spec(stats(pid()|state()) -> emqx_types:stats()).
stats(WsPid) when is_pid(WsPid) -> stats(WsPid) when is_pid(WsPid) ->
call(WsPid, stats); call(WsPid, stats);
stats(#state{chan_state = ChanState}) -> stats(#state{channel = Channel}) ->
SockStats = emqx_pd:get_counters(?SOCK_STATS), SockStats = emqx_pd:get_counters(?SOCK_STATS),
ConnStats = emqx_pd:get_counters(?CONN_STATS), ConnStats = emqx_pd:get_counters(?CONN_STATS),
ChanStats = emqx_channel:stats(ChanState), ChanStats = emqx_channel:stats(Channel),
ProcStats = emqx_misc:proc_stats(), ProcStats = emqx_misc:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]). lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
@ -168,27 +170,26 @@ websocket_init([Req, Opts]) ->
FrameOpts = emqx_zone:frame_options(Zone), FrameOpts = emqx_zone:frame_options(Zone),
ParseState = emqx_frame:initial_parse_state(FrameOpts), ParseState = emqx_frame:initial_parse_state(FrameOpts),
Serialize = emqx_frame:serialize_fun(), Serialize = emqx_frame:serialize_fun(),
ChanState = emqx_channel:init(ConnInfo, Opts), Channel = emqx_channel:init(ConnInfo, Opts),
emqx_logger:set_metadata_peername(esockd_net:format(Peername)), emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
{ok, #state{peername = Peername, {ok, #state{peername = Peername,
sockname = Sockname, sockname = Sockname,
conn_state = idle, sockstate = idle,
parse_state = ParseState, parse_state = ParseState,
serialize = Serialize, serialize = Serialize,
chan_state = ChanState, channel = Channel,
pendings = [] pendings = []
}}. }}.
websocket_handle({binary, Data}, State) when is_list(Data) -> websocket_handle({binary, Data}, State) when is_list(Data) ->
websocket_handle({binary, iolist_to_binary(Data)}, State); websocket_handle({binary, iolist_to_binary(Data)}, State);
websocket_handle({binary, Data}, State = #state{chan_state = ChanState}) -> websocket_handle({binary, Data}, State = #state{channel = Channel}) ->
?LOG(debug, "RECV ~p", [Data]), ?LOG(debug, "RECV ~p", [Data]),
Oct = iolist_size(Data), Oct = iolist_size(Data),
ok = inc_recv_stats(1, Oct), ok = inc_recv_stats(1, Oct),
NChanState = emqx_channel:received(Oct, ChanState), {ok, NChannel} = emqx_channel:handle_in(Oct, Channel),
NState = State#state{chan_state = NChanState}, process_incoming(Data, State#state{channel = NChannel});
process_incoming(Data, NState);
%% Pings should be replied with pongs, cowboy does it automatically %% Pings should be replied with pongs, cowboy does it automatically
%% Pongs can be safely ignored. Clause here simply prevents crash. %% Pongs can be safely ignored. Clause here simply prevents crash.
@ -203,56 +204,27 @@ websocket_handle({FrameType, _}, State) ->
?LOG(error, "Unexpected frame - ~p", [FrameType]), ?LOG(error, "Unexpected frame - ~p", [FrameType]),
stop({shutdown, unexpected_ws_frame}, State). stop({shutdown, unexpected_ws_frame}, State).
websocket_info({call, From, info}, State) -> websocket_info({call, From, Req}, State) ->
gen_server:reply(From, info(State)), handle_call(From, Req, State);
{ok, State};
websocket_info({call, From, stats}, State) -> websocket_info({cast, Msg}, State = #state{channel = Channel}) ->
gen_server:reply(From, stats(State)), handle_return(emqx_channel:handle_info(Msg, Channel), State);
{ok, State};
websocket_info({call, From, state}, State) ->
gen_server:reply(From, State),
{ok, State};
websocket_info({call, From, Req}, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_call(Req, ChanState) of
{ok, Reply, NChanState} ->
_ = gen_server:reply(From, Reply),
{ok, State#state{chan_state = NChanState}};
{stop, Reason, Reply, NChanState} ->
_ = gen_server:reply(From, Reply),
stop(Reason, State#state{chan_state = NChanState})
end;
websocket_info({cast, Msg}, State = #state{chan_state = ChanState}) ->
case emqx_channel:handle_info(Msg, ChanState) of
ok -> {ok, State};
{ok, NChanState} ->
{ok, State#state{chan_state = NChanState}};
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState})
end;
websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
NState = State#state{serialize = emqx_frame:serialize_fun(ConnPkt)}, Serialize = emqx_frame:serialize_fun(ConnPkt),
handle_incoming(Packet, fun connected/1, NState); NState = State#state{sockstate = running,
serialize = Serialize
},
handle_incoming(Packet, NState);
websocket_info({incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> websocket_info({incoming, Packet}, State) ->
handle_incoming(Packet, fun reply/1, State); handle_incoming(Packet, State);
websocket_info({incoming, FrameError = {frame_error, _Reason}}, State) ->
handle_incoming(FrameError, State);
websocket_info(Deliver = {deliver, _Topic, _Msg}, websocket_info(Deliver = {deliver, _Topic, _Msg},
State = #state{chan_state = ChanState}) -> State = #state{channel = Channel}) ->
Delivers = emqx_misc:drain_deliver([Deliver]), Delivers = emqx_misc:drain_deliver([Deliver]),
case emqx_channel:handle_out({deliver, Delivers}, ChanState) of Result = emqx_channel:handle_out(Delivers, Channel),
{ok, NChanState} -> handle_return(Result, State);
reply(State#state{chan_state = NChanState});
{ok, Packets, NChanState} ->
reply(enqueue(Packets, State#state{chan_state = NChanState}))
end;
websocket_info({timeout, TRef, keepalive}, State) when is_reference(TRef) -> websocket_info({timeout, TRef, keepalive}, State) when is_reference(TRef) ->
RecvOct = emqx_pd:get_counter(recv_oct), RecvOct = emqx_pd:get_counter(recv_oct),
@ -264,60 +236,70 @@ websocket_info({timeout, TRef, emit_stats}, State) when is_reference(TRef) ->
websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) -> websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
handle_timeout(TRef, Msg, State); handle_timeout(TRef, Msg, State);
websocket_info({close, Reason}, State) ->
stop({shutdown, Reason}, State);
websocket_info({shutdown, Reason}, State) -> websocket_info({shutdown, Reason}, State) ->
stop({shutdown, Reason}, State); stop({shutdown, Reason}, State);
websocket_info({stop, Reason}, State) -> websocket_info({stop, Reason}, State) ->
stop(Reason, State); stop(Reason, State);
websocket_info(Info, State = #state{chan_state = ChanState}) -> websocket_info(Info, State) ->
case emqx_channel:handle_info(Info, ChanState) of handle_info(Info, State).
{ok, NChanState} ->
{ok, State#state{chan_state = NChanState}};
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState})
end.
terminate(SockError, _Req, #state{chan_state = ChanState, websocket_close(Reason, State) ->
?LOG(debug, "WebSocket closed due to ~p~n", [Reason]),
handle_info({sock_closed, Reason}, State).
terminate(SockError, _Req, #state{channel = Channel,
stop_reason = Reason}) -> stop_reason = Reason}) ->
?LOG(debug, "Terminated for ~p, sockerror: ~p", [Reason, SockError]), ?LOG(debug, "Terminated for ~p, sockerror: ~p", [Reason, SockError]),
emqx_channel:terminate(Reason, ChanState). emqx_channel:terminate(Reason, Channel).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Connected callback %% Handle call
connected(State = #state{chan_state = ChanState}) -> handle_call(From, info, State) ->
ChanAttrs = emqx_channel:attrs(ChanState), gen_server:reply(From, info(State)),
SockAttrs = #{active_state => running}, {ok, State};
Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
ok = emqx_channel:handle_info({register, Attrs, stats(State)}, ChanState),
reply(State#state{conn_state = connected}).
%%-------------------------------------------------------------------- handle_call(From, stats, State) ->
%% Close gen_server:reply(From, stats(State)),
{ok, State};
close(Reason, State) -> handle_call(From, Req, State = #state{channel = Channel}) ->
?LOG(warning, "Closed for ~p", [Reason]), case emqx_channel:handle_call(Req, Channel) of
reply(State#state{conn_state = disconnected}). {reply, Reply, NChannel} ->
_ = gen_server:reply(From, Reply),
{ok, State#state{channel = NChannel}};
{stop, Reason, Reply, NChannel} ->
_ = gen_server:reply(From, Reply),
stop(Reason, State#state{channel = NChannel});
{stop, Reason, Reply, OutPacket, NChannel} ->
gen_server:reply(From, Reply),
NState = State#state{channel = NChannel},
stop(Reason, enqueue(OutPacket, NState))
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle timeout %% Handle timeout
handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
case emqx_channel:handle_timeout(TRef, Msg, ChanState) of handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
{ok, NChanState} ->
{ok, State#state{chan_state = NChanState}}; %%--------------------------------------------------------------------
{ok, Packets, NChanState} -> %% Handle Info
NState = State#state{chan_state = NChanState},
reply(enqueue(Packets, NState)); handle_info({enter, _}, State = #state{channel = Channel}) ->
{close, Reason, NChanState} -> ChanAttrs = emqx_channel:attrs(Channel),
close(Reason, State#state{chan_state = NChanState}); SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
{close, Reason, OutPackets, NChanState} -> Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
NState = State#state{chan_state= NChanState}, ok = emqx_channel:handle_info({register, Attrs, stats(State)}, Channel),
close(Reason, enqueue(OutPackets, NState)); reply(State);
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState}) handle_info(Info, State = #state{channel = Channel}) ->
end. handle_return(emqx_channel:handle_info(Info, Channel), State).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process incoming data %% Process incoming data
@ -343,48 +325,39 @@ process_incoming(Data, State = #state{parse_state = ParseState}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle incoming packets %% Handle incoming packets
handle_incoming(Packet = ?PACKET(Type), SuccFun, handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) ->
State = #state{chan_state = ChanState}) ->
_ = inc_incoming_stats(Type), _ = inc_incoming_stats(Type),
_ = emqx_metrics:inc_recv(Packet), _ = emqx_metrics:inc_recv(Packet),
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
case emqx_channel:handle_in(Packet, ChanState) of handle_return(emqx_channel:handle_in(Packet, Channel), State);
{ok, NChanState} ->
SuccFun(State#state{chan_state= NChanState});
{ok, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
SuccFun(enqueue(OutPackets, NState));
{close, Reason, NChanState} ->
close(Reason, State#state{chan_state = NChanState});
{close, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
close(Reason, enqueue(OutPackets, NState));
{stop, Reason, NChanState} ->
stop(Reason, State#state{chan_state = NChanState});
{stop, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state= NChanState},
stop(Reason, enqueue(OutPackets, NState))
end.
handle_incoming(FrameError = {frame_error, _Reason}, handle_incoming(FrameError, State = #state{channel = Channel}) ->
State = #state{chan_state = ChanState}) -> handle_return(emqx_channel:handle_in(FrameError, Channel), State).
case emqx_channel:handle_in(FrameError, ChanState) of
{stop, Reason, NChanState} -> %%--------------------------------------------------------------------
stop(Reason, State#state{chan_state = NChanState}); %% Handle channel return
{stop, Reason, OutPackets, NChanState} ->
NState = State#state{chan_state = NChanState}, handle_return(ok, State) ->
stop(Reason, enqueue(OutPackets, NState)) reply(State);
end. handle_return({ok, NChannel}, State) ->
reply(State#state{channel= NChannel});
handle_return({ok, Replies, NChannel}, State) ->
reply(Replies, State#state{channel= NChannel});
handle_return({stop, Reason, NChannel}, State) ->
stop(Reason, State#state{channel = NChannel});
handle_return({stop, Reason, OutPacket, NChannel}, State) ->
NState = State#state{channel = NChannel},
stop(Reason, enqueue(OutPacket, NState)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle outgoing packets %% Handle outgoing packets
handle_outgoing(Packets, State = #state{chan_state = ChanState}) -> handle_outgoing(Packets, State = #state{channel = Channel}) ->
IoData = lists:map(serialize_and_inc_stats_fun(State), Packets), IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
ok = inc_sent_stats(length(Packets), Oct), ok = inc_sent_stats(length(Packets), Oct),
NChanState = emqx_channel:sent(Oct, ChanState), {ok, NChannel} = emqx_channel:handle_out(Oct, Channel),
{{binary, IoData}, State#state{chan_state = NChanState}}. {{binary, IoData}, State#state{channel = NChannel}}.
%% TODO: Duplicated with emqx_channel:serialize_and_inc_stats_fun/1 %% TODO: Duplicated with emqx_channel:serialize_and_inc_stats_fun/1
serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
@ -433,7 +406,25 @@ inc_sent_stats(Cnt, Oct) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Reply or Stop %% Reply or Stop
-compile({inline, [reply/1]}). reply(Packet, State) when is_record(Packet, mqtt_packet) ->
reply(enqueue(Packet, State));
reply({outgoing, Packets}, State) ->
reply(enqueue(Packets, State));
reply(Close = {close, _Reason}, State) ->
self() ! Close,
reply(State);
reply([], State) ->
reply(State);
reply([Packet|More], State) when is_record(Packet, mqtt_packet) ->
reply(More, enqueue(Packet, State));
reply([{outgoing, Packets}|More], State) ->
reply(More, enqueue(Packets, State));
reply([Other|More], State) ->
self() ! Other,
reply(More, State).
-compile({inline, [reply/1, enqueue/2]}).
reply(State = #state{pendings = []}) -> reply(State = #state{pendings = []}) ->
{ok, State}; {ok, State};
@ -441,6 +432,11 @@ reply(State = #state{pendings = Pendings}) ->
{Reply, NState} = handle_outgoing(Pendings, State), {Reply, NState} = handle_outgoing(Pendings, State),
{reply, Reply, NState#state{pendings = []}}. {reply, Reply, NState#state{pendings = []}}.
enqueue(Packet, State) when is_record(Packet, mqtt_packet) ->
enqueue([Packet], State);
enqueue(Packets, State = #state{pendings = Pendings}) ->
State#state{pendings = lists:append(Pendings, Packets)}.
stop(Reason, State = #state{pendings = []}) -> stop(Reason, State = #state{pendings = []}) ->
{stop, State#state{stop_reason = Reason}}; {stop, State#state{stop_reason = Reason}};
stop(Reason, State = #state{pendings = Pendings}) -> stop(Reason, State = #state{pendings = Pendings}) ->
@ -448,8 +444,3 @@ stop(Reason, State = #state{pendings = Pendings}) ->
State2 = State1#state{pendings = [], stop_reason = Reason}, State2 = State1#state{pendings = [], stop_reason = Reason},
{reply, [Reply, close], State2}. {reply, [Reply, close], State2}.
enqueue(Packet, State) when is_record(Packet, mqtt_packet) ->
enqueue([Packet], State);
enqueue(Packets, State = #state{pendings = Pendings}) ->
State#state{pendings = lists:append(Pendings, Packets)}.

View File

@ -64,17 +64,18 @@ t_handle_connect(_) ->
is_bridge = false, is_bridge = false,
clean_start = true, clean_start = true,
keepalive = 30, keepalive = 30,
properties = #{}, properties = undefined,
clientid = <<"clientid">>, clientid = <<"clientid">>,
username = <<"username">>, username = <<"username">>,
password = <<"passwd">> password = <<"passwd">>
}, },
with_channel( with_channel(
fun(Channel) -> fun(Channel) ->
{ok, ?CONNACK_PACKET(?RC_SUCCESS), Channel1} ConnAck = ?CONNACK_PACKET(?RC_SUCCESS, 0, #{}),
= handle_in(?CONNECT_PACKET(ConnPkt), Channel), ExpectedOutput = [{enter, connected},{outgoing, ConnAck}],
#{clientid := ClientId, username := Username} {ok, Output, Channel1} = handle_in(?CONNECT_PACKET(ConnPkt), Channel),
= emqx_channel:info(clientinfo, Channel1), ?assertEqual(ExpectedOutput, Output),
#{clientid := ClientId, username := Username} = emqx_channel:info(clientinfo, Channel1),
?assertEqual(<<"clientid">>, ClientId), ?assertEqual(<<"clientid">>, ClientId),
?assertEqual(<<"username">>, Username) ?assertEqual(<<"username">>, Username)
end). end).
@ -164,7 +165,7 @@ t_handle_pingreq(_) ->
t_handle_disconnect(_) -> t_handle_disconnect(_) ->
with_channel( with_channel(
fun(Channel) -> fun(Channel) ->
{stop, normal, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel), {stop, {shutdown, normal}, Channel1} = handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel),
?assertEqual(undefined, emqx_channel:info(will_msg, Channel1)) ?assertEqual(undefined, emqx_channel:info(will_msg, Channel1))
end). end).
@ -172,7 +173,7 @@ t_handle_in_auth(_) ->
with_channel( with_channel(
fun(Channel) -> fun(Channel) ->
Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR), Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR),
{stop, implementation_specific_error, Packet, Channel} = handle_in(?AUTH_PACKET(), Channel) {stop, {shutdown, implementation_specific_error}, Packet, Channel} = handle_in(?AUTH_PACKET(), Channel)
end). end).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -180,7 +181,7 @@ t_handle_in_auth(_) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_handle_deliver(_) -> t_handle_deliver(_) ->
with_channel( with_connected_channel(
fun(Channel) -> fun(Channel) ->
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS#{qos => ?QOS_2}}], TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS#{qos => ?QOS_2}}],
{ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel1} {ok, ?SUBACK_PACKET(1, [?QOS_2]), Channel1}
@ -188,7 +189,7 @@ t_handle_deliver(_) ->
Msg0 = emqx_message:make(<<"clientx">>, ?QOS_0, <<"t0">>, <<"qos0">>), Msg0 = emqx_message:make(<<"clientx">>, ?QOS_0, <<"t0">>, <<"qos0">>),
Msg1 = emqx_message:make(<<"clientx">>, ?QOS_1, <<"t1">>, <<"qos1">>), Msg1 = emqx_message:make(<<"clientx">>, ?QOS_1, <<"t1">>, <<"qos1">>),
Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}], Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
{ok, Packets, _Ch} = emqx_channel:handle_out({deliver, Delivers}, Channel1), {ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, Channel1),
?assertEqual([?QOS_0, ?QOS_1], [emqx_packet:qos(Pkt)|| Pkt <- Packets]) ?assertEqual([?QOS_0, ?QOS_1], [emqx_packet:qos(Pkt)|| Pkt <- Packets])
end). end).
@ -206,10 +207,9 @@ t_handle_out_connack(_) ->
}, },
with_channel( with_channel(
fun(Channel) -> fun(Channel) ->
{ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, _), _} {ok, [{enter, connected},{outgoing, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}], _Chan}
= handle_out({connack, ?RC_SUCCESS, 0, ConnPkt}, Channel), = handle_out({connack, ?RC_SUCCESS, 0, ConnPkt}, Channel),
{stop, {shutdown, not_authorized}, {stop, {shutdown, not_authorized}, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _}
?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _}
= handle_out({connack, ?RC_NOT_AUTHORIZED, ConnPkt}, Channel) = handle_out({connack, ?RC_NOT_AUTHORIZED, ConnPkt}, Channel)
end). end).
@ -220,7 +220,7 @@ t_handle_out_publish(_) ->
Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)}, Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
{ok, ?PUBLISH_PACKET(?QOS_0), Channel} = handle_out(Pub0, Channel), {ok, ?PUBLISH_PACKET(?QOS_0), Channel} = handle_out(Pub0, Channel),
{ok, ?PUBLISH_PACKET(?QOS_1), Channel} = handle_out(Pub1, Channel), {ok, ?PUBLISH_PACKET(?QOS_1), Channel} = handle_out(Pub1, Channel),
{ok, Packets, Channel1} = handle_out({publish, [Pub0, Pub1]}, Channel), {ok, {outgoing, Packets}, Channel1} = handle_out({publish, [Pub0, Pub1]}, Channel),
?assertEqual(2, length(Packets)), ?assertEqual(2, length(Packets)),
?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, Channel1)) ?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, Channel1))
end). end).
@ -304,6 +304,12 @@ t_terminate(_) ->
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
with_connected_channel(TestFun) ->
with_channel(
fun(Channel) ->
TestFun(emqx_channel:set_field(conn_state, connected, Channel))
end).
with_channel(TestFun) -> with_channel(TestFun) ->
with_channel(#{}, TestFun). with_channel(#{}, TestFun).

View File

@ -97,7 +97,7 @@ t_cm(_) ->
ClientId = <<"myclient">>, ClientId = <<"myclient">>,
{ok, C} = emqtt:start_link([{clientid, ClientId}]), {ok, C} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
ct:sleep(50), ct:sleep(500),
#{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_attrs(ClientId), #{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_attrs(ClientId),
emqtt:subscribe(C, <<"mytopic">>, 0), emqtt:subscribe(C, <<"mytopic">>, 0),
ct:sleep(1200), ct:sleep(1200),

View File

@ -25,6 +25,7 @@
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_logger:set_log_level(emergency),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->

View File

@ -39,6 +39,7 @@ groups() ->
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_logger:set_log_level(emergency),
application:ensure_all_started(gproc), application:ensure_all_started(gproc),
Config. Config.