Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-03-21 04:14:44 +08:00
commit dac1b92d8f
2 changed files with 54 additions and 15 deletions

View File

@ -192,10 +192,10 @@ idle(enter, _, State) ->
idle(timeout, _Timeout, State) -> idle(timeout, _Timeout, State) ->
{stop, idle_timeout, State}; {stop, idle_timeout, State};
idle(cast, {incoming, Packet, PState}, _State) -> idle(cast, {incoming, Packet}, State) ->
handle_packet(Packet, fun(NState) -> handle_packet(Packet, fun(NState) ->
{next_state, connected, reset_parser(NState)} {next_state, connected, reset_parser(NState)}
end, PState); end, State);
idle(EventType, Content, State) -> idle(EventType, Content, State) ->
?HANDLE(EventType, Content, State). ?HANDLE(EventType, Content, State).
@ -208,12 +208,12 @@ connected(enter, _, _State) ->
keep_state_and_data; keep_state_and_data;
%% Handle Input %% Handle Input
connected(cast, {incoming, Packet = ?PACKET(Type), PState}, _State) -> connected(cast, {incoming, Packet = ?PACKET(Type)}, State) ->
_ = emqx_metrics:received(Packet), _ = emqx_metrics:received(Packet),
(Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1), (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1),
handle_packet(Packet, fun(NState) -> handle_packet(Packet, fun(NState) ->
{keep_state, reset_parser(NState)} {keep_state, NState}
end, PState); end, State);
%% Handle Output %% Handle Output
connected(info, {deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> connected(info, {deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
@ -373,14 +373,14 @@ terminate(Reason, _StateName, #state{transport = Transport,
%% Process incoming data %% Process incoming data
process_incoming(<<>>, Packets, State) -> process_incoming(<<>>, Packets, State) ->
{keep_state, State, next_events({Packets, State})}; {keep_state, State, next_events(Packets)};
process_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> process_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
try emqx_frame:parse(Data, ParseState) of try emqx_frame:parse(Data, ParseState) of
{ok, Packet, Rest} -> {ok, Packet, Rest} ->
process_incoming(Rest, [Packet|Packets], reset_parser(State)); process_incoming(Rest, [Packet|Packets], reset_parser(State));
{more, NewParseState} -> {more, NewParseState} ->
{keep_state, State#state{parse_state = NewParseState}, next_events({Packets, State})}; {keep_state, State#state{parse_state = NewParseState}, next_events(Packets)};
{error, Reason} -> {error, Reason} ->
shutdown(Reason, State) shutdown(Reason, State)
catch catch
@ -392,12 +392,10 @@ process_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
reset_parser(State = #state{proto_state = ProtoState}) -> reset_parser(State = #state{proto_state = ProtoState}) ->
State#state{parse_state = emqx_protocol:parser(ProtoState)}. State#state{parse_state = emqx_protocol:parser(ProtoState)}.
next_events([]) -> next_events(Packets) when is_list(Packets) ->
[]; [next_events(Packet) || Packet <- lists:reverse(Packets)];
next_events([{Packet, State}]) -> next_events(Packet) ->
{next_event, cast, {incoming, Packet, State}}; {next_event, cast, {incoming, Packet}}.
next_events({Packets, State}) ->
[next_events([{Packet, State}]) || Packet <- lists:reverse(Packets)].
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Handle incoming packet %% Handle incoming packet

View File

@ -36,7 +36,8 @@ all() ->
{group, mqtt_common}, {group, mqtt_common},
{group, mqttv4}, {group, mqttv4},
{group, mqttv5}, {group, mqttv5},
{group, acl} {group, acl},
{group, frame_partial}
]. ].
groups() -> groups() ->
@ -51,7 +52,9 @@ groups() ->
[connect_v5, [connect_v5,
subscribe_v5]}, subscribe_v5]},
{acl, [sequence], {acl, [sequence],
[acl_deny_action_ct]}]. [acl_deny_action_ct]},
{frame_partial, [sequence],
[handle_followed_packet]}].
init_per_suite(Config) -> init_per_suite(Config) ->
[start_apps(App, SchemaFile, ConfigFile) || [start_apps(App, SchemaFile, ConfigFile) ||
@ -97,6 +100,44 @@ with_connection(DoFun) ->
% emqx_client_sock:close(Sock) % emqx_client_sock:close(Sock)
% end. % end.
handle_followed_packet(_Config) ->
ConnPkt = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
PartialPkt1 = <<50,182,1,0,4,116,101,115,116,0,1,48,48,48,48,48,48,48,48,48,48,48,48,48,
48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48>>,
PartialPkt2 = <<48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48>>,
%% This is a PUBLISH message (Qos=1)
PubPkt = <<PartialPkt1/binary, PartialPkt2/binary>>,
ComplexPkt = <<PubPkt/binary, PubPkt/binary, PubPkt/binary, PartialPkt1/binary>>,
AssertConnAck = fun(R) -> ?assertEqual({ok, <<32,2,0,0>>}, R) end,
AssertPubAck = fun(R) -> ?assertEqual({ok, <<64,2,0,1>>}, R) end,
{ok, Sock} = gen_tcp:connect("127.0.0.1", 1883, [{active, false}, binary]),
%% CONNECT
ok = gen_tcp:send(Sock, ConnPkt),
AssertConnAck(gen_tcp:recv(Sock, 4, 500)),
%% Once Publish
ok = gen_tcp:send(Sock, PubPkt),
AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
%% Complex Packet
ok = gen_tcp:send(Sock, ComplexPkt),
AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
ok = gen_tcp:send(Sock, PartialPkt2),
AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
gen_tcp:close(Sock).
connect_v4(_) -> connect_v4(_) ->
with_connection(fun([Sock]) -> with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock, raw_send_serialize(?PACKET(?PUBLISH))), emqx_client_sock:send(Sock, raw_send_serialize(?PACKET(?PUBLISH))),