Merge pull request #2281 from emqx/eliminate-inflight-full-error

Eliminate inflight full error
This commit is contained in:
spring2maz 2019-03-09 09:07:12 +01:00 committed by GitHub
commit ad74772191
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 116 additions and 81 deletions

View File

@ -110,7 +110,7 @@ safe_stop(Pid, StopF, Timeout) ->
send(Conn, Batch) -> send(Conn, Batch) ->
send(Conn, Batch, []). send(Conn, Batch, []).
send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest] = Batch, Acc) -> send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest], Acc) ->
case emqx_client:publish(ClientPid, Msg) of case emqx_client:publish(ClientPid, Msg) of
{ok, PktId} when Rest =:= [] -> {ok, PktId} when Rest =:= [] ->
%% last one sent %% last one sent
@ -119,9 +119,6 @@ send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Re
{ok, Ref}; {ok, Ref};
{ok, PktId} -> {ok, PktId} ->
send(Conn, Rest, [PktId | Acc]); send(Conn, Rest, [PktId | Acc]);
{error, {_PacketId, inflight_full}} ->
timer:sleep(10),
send(Conn, Batch, Acc);
{error, Reason} -> {error, Reason} ->
%% NOTE: There is no partial sucess of a batch and recover from the middle %% NOTE: There is no partial sucess of a batch and recover from the middle
%% only to retry all messages in one batch %% only to retry all messages in one batch

View File

@ -38,7 +38,7 @@
%% For test cases %% For test cases
-export([pause/1, resume/1]). -export([pause/1, resume/1]).
-export([initialized/3, waiting_for_connack/3, connected/3]). -export([initialized/3, waiting_for_connack/3, connected/3, inflight_full/3]).
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). -export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, -export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0,
@ -743,12 +743,12 @@ waiting_for_connack(EventType, EventContent, State) ->
false -> {stop, connack_timeout} false -> {stop, connack_timeout}
end. end.
connected({call, From}, subscriptions, State = #state{subscriptions = Subscriptions}) -> connected({call, From}, subscriptions, #state{subscriptions = Subscriptions}) ->
{keep_state, State, [{reply, From, maps:to_list(Subscriptions)}]}; {keep_state_and_data, [{reply, From, maps:to_list(Subscriptions)}]};
connected({call, From}, info, State) -> connected({call, From}, info, State) ->
Info = lists:zip(record_info(fields, state), tl(tuple_to_list(State))), Info = lists:zip(record_info(fields, state), tl(tuple_to_list(State))),
{keep_state, State, [{reply, From, Info}]}; {keep_state_and_data, [{reply, From, Info}]};
connected({call, From}, pause, State) -> connected({call, From}, pause, State) ->
{keep_state, State#state{paused = true}, [{reply, From, ok}]}; {keep_state, State#state{paused = true}, [{reply, From, ok}]};
@ -756,11 +756,11 @@ connected({call, From}, pause, State) ->
connected({call, From}, resume, State) -> connected({call, From}, resume, State) ->
{keep_state, State#state{paused = false}, [{reply, From, ok}]}; {keep_state, State#state{paused = false}, [{reply, From, ok}]};
connected({call, From}, get_properties, State = #state{properties = Properties}) -> connected({call, From}, get_properties, #state{properties = Properties}) ->
{keep_state, State, [{reply, From, Properties}]}; {keep_state_and_data, [{reply, From, Properties}]};
connected({call, From}, client_id, State = #state{client_id = ClientId}) -> connected({call, From}, client_id, #state{client_id = ClientId}) ->
{keep_state, State, [{reply, From, ClientId}]}; {keep_state_and_data, [{reply, From, ClientId}]};
connected({call, From}, {set_request_handler, RequestHandler}, State) -> connected({call, From}, {set_request_handler, RequestHandler}, State) ->
{keep_state, State#state{request_handler = RequestHandler}, [{reply, From, ok}]}; {keep_state, State#state{request_handler = RequestHandler}, [{reply, From, ok}]};
@ -790,20 +790,19 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = ?QOS_0}}, State) ->
connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}}, connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}},
State = #state{inflight = Inflight, last_packet_id = PacketId}) State = #state{inflight = Inflight, last_packet_id = PacketId})
when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
case emqx_inflight:is_full(Inflight) of Msg1 = Msg#mqtt_msg{packet_id = PacketId},
true -> case send(Msg1, State) of
{keep_state, State, [{reply, From, {error, {PacketId, inflight_full}}}]}; {ok, NewState} ->
false -> Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight),
Msg1 = Msg#mqtt_msg{packet_id = PacketId}, State1 = ensure_retry_timer(NewState#state{inflight = Inflight1}),
case send(Msg1, State) of Actions = [{reply, From, {ok, PacketId}}],
{ok, NewState} -> case emqx_inflight:is_full(Inflight1) of
Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight), true -> {next_state, inflight_full, State1, Actions};
{keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}), false -> {keep_state, State1, Actions}
[{reply, From, {ok, PacketId}}]}; end;
{error, Reason} -> {error, Reason} ->
{stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]} {stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]}
end end;
end;
connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics}, connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics},
State = #state{last_packet_id = PacketId}) -> State = #state{last_packet_id = PacketId}) ->
@ -844,8 +843,8 @@ connected(cast, {pubrel, PacketId, ReasonCode, Properties}, State) ->
connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) -> connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) ->
send_puback(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State); send_puback(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State);
connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), State = #state{paused = true}) -> connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), #state{paused = true}) ->
{keep_state, State}; keep_state_and_data;
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, Properties, Payload), connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, Properties, Payload),
State) when Properties =/= undefined -> State) when Properties =/= undefined ->
@ -869,18 +868,8 @@ connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, Properties,
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
publish_process(?QOS_2, Packet, State); publish_process(?QOS_2, Packet, State);
connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties), connected(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) ->
State = #state{inflight = Inflight}) -> {keep_state, delete_inflight(PubAck, State)};
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} ->
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
reason_code => ReasonCode,
properties => Properties}),
{keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}};
none ->
emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]),
{keep_state, State}
end;
connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) -> connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) ->
send_puback(?PUBREL_PACKET(PacketId), send_puback(?PUBREL_PACKET(PacketId),
@ -908,21 +897,11 @@ connected(cast, ?PUBREL_PACKET(PacketId),
end; end;
error -> error ->
emqx_logger:warning("Unexpected PUBREL: ~p", [PacketId]), emqx_logger:warning("Unexpected PUBREL: ~p", [PacketId]),
{keep_state, State} keep_state_and_data
end; end;
connected(cast, ?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), connected(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) ->
State = #state{inflight = Inflight}) -> {keep_state, delete_inflight(PubComp, State)};
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {pubrel, _PacketId, _Ts}} ->
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
reason_code => ReasonCode,
properties => Properties}),
{keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}};
none ->
emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]),
{keep_state, State}
end;
connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes), connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes),
State = #state{subscriptions = _Subscriptions}) -> State = #state{subscriptions = _Subscriptions}) ->
@ -931,7 +910,8 @@ connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes),
%%TODO: Merge reason codes to subscriptions? %%TODO: Merge reason codes to subscriptions?
Reply = {ok, Properties, ReasonCodes}, Reply = {ok, Properties, ReasonCodes},
{keep_state, NewState, [{reply, From, Reply}]}; {keep_state, NewState, [{reply, From, Reply}]};
false -> {keep_state, State} false ->
keep_state_and_data
end; end;
connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes),
@ -944,16 +924,18 @@ connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes),
end, Subscriptions, Topics), end, Subscriptions, Topics),
{keep_state, NewState#state{subscriptions = Subscriptions1}, {keep_state, NewState#state{subscriptions = Subscriptions1},
[{reply, From, {ok, Properties, ReasonCodes}}]}; [{reply, From, {ok, Properties, ReasonCodes}}]};
false -> {keep_state, State} false ->
keep_state_and_data
end; end;
connected(cast, ?PACKET(?PINGRESP), State = #state{pending_calls = []}) -> connected(cast, ?PACKET(?PINGRESP), #state{pending_calls = []}) ->
{keep_state, State}; keep_state_and_data;
connected(cast, ?PACKET(?PINGRESP), State) -> connected(cast, ?PACKET(?PINGRESP), State) ->
case take_call(ping, State) of case take_call(ping, State) of
{value, #call{from = From}, NewState} -> {value, #call{from = From}, NewState} ->
{keep_state, NewState, [{reply, From, pong}]}; {keep_state, NewState, [{reply, From, pong}]};
false -> {keep_state, State} false ->
keep_state_and_data
end; end;
connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) -> connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) ->
@ -998,14 +980,16 @@ connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef,
connected(EventType, EventContent, Data) -> connected(EventType, EventContent, Data) ->
handle_event(EventType, EventContent, connected, Data). handle_event(EventType, EventContent, connected, Data).
should_ping(Sock) -> inflight_full({call, _From}, {publish, #mqtt_msg{qos = QoS}}, _State) when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
case emqx_client_sock:getstat(Sock, [send_oct]) of {keep_state_and_data, [postpone]};
{ok, [{send_oct, Val}]} -> inflight_full(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) ->
OldVal = get(send_oct), put(send_oct, Val), delete_inflight_when_full(PubAck, State);
OldVal == undefined orelse OldVal == Val; inflight_full(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) ->
Error = {error, _Reason} -> delete_inflight_when_full(PubComp, State);
Error inflight_full(EventType, EventContent, Data) ->
end. %% inflight_full is a sub-state of connected state,
%% delegate all other events to connected state.
connected(EventType, EventContent, Data).
handle_event({call, From}, stop, _StateName, _State) -> handle_event({call, From}, stop, _StateName, _State) ->
{stop_and_reply, normal, [{reply, From, ok}]}; {stop_and_reply, normal, [{reply, From, ok}]};
@ -1028,17 +1012,17 @@ handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) ->
emqx_logger:debug("[~p] Got EXIT from owner, Reason: ~p", [?MODULE, Reason]), emqx_logger:debug("[~p] Got EXIT from owner, Reason: ~p", [?MODULE, Reason]),
{stop, {shutdown, Reason}, State}; {stop, {shutdown, Reason}, State};
handle_event(info, {inet_reply, _Sock, ok}, _, State) -> handle_event(info, {inet_reply, _Sock, ok}, _, _State) ->
{keep_state, State}; keep_state_and_data;
handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
emqx_logger:error("[~p] got tcp error: ~p", [?MODULE, Reason]), emqx_logger:error("[~p] got tcp error: ~p", [?MODULE, Reason]),
{stop, {shutdown, Reason}, State}; {stop, {shutdown, Reason}, State};
handle_event(EventType, EventContent, StateName, StateData) -> handle_event(EventType, EventContent, StateName, _StateData) ->
emqx_logger:error("State: ~s, Unexpected Event: (~p, ~p)", emqx_logger:error("State: ~s, Unexpected Event: (~p, ~p)",
[StateName, EventType, EventContent]), [StateName, EventType, EventContent]),
{keep_state, StateData}. keep_state_and_data.
%% Mandatory callback functions %% Mandatory callback functions
terminate(Reason, _StateName, State = #state{socket = Socket}) -> terminate(Reason, _StateName, State = #state{socket = Socket}) ->
@ -1061,6 +1045,47 @@ code_change(_Vsn, State, Data, _Extra) ->
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
should_ping(Sock) ->
case emqx_client_sock:getstat(Sock, [send_oct]) of
{ok, [{send_oct, Val}]} ->
OldVal = get(send_oct), put(send_oct, Val),
OldVal == undefined orelse OldVal == Val;
Error = {error, _Reason} ->
Error
end.
delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties),
State = #state{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} ->
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
reason_code => ReasonCode,
properties => Properties}),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]),
State
end;
delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
State = #state{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {pubrel, _PacketId, _Ts}} ->
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
reason_code => ReasonCode,
properties => Properties}),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]),
State
end.
delete_inflight_when_full(Packet, State0) ->
State = #state{inflight = Inflight} = delete_inflight(Packet, State0),
case emqx_inflight:is_full(Inflight) of
true -> {keep_state, State};
false -> {next_state, connected, State}
end.
%% Subscribe to response topic. %% Subscribe to response topic.
-spec(sub_response_topic(client(), qos(), topic()) -> ok). -spec(sub_response_topic(client(), qos(), topic()) -> ok).
sub_response_topic(Client, QoS, Topic) when is_binary(Topic) -> sub_response_topic(Client, QoS, Topic) when is_binary(Topic) ->
@ -1222,11 +1247,12 @@ ensure_ack_timer(State = #state{ack_timer = undefined,
ensure_ack_timer(State) -> State. ensure_ack_timer(State) -> State.
ensure_retry_timer(State = #state{retry_interval = Interval}) -> ensure_retry_timer(State = #state{retry_interval = Interval}) ->
ensure_retry_timer(Interval, State). do_ensure_retry_timer(Interval, State).
ensure_retry_timer(Interval, State = #state{retry_timer = undefined})
do_ensure_retry_timer(Interval, State = #state{retry_timer = undefined})
when Interval > 0 -> when Interval > 0 ->
State#state{retry_timer = erlang:start_timer(Interval, self(), retry)}; State#state{retry_timer = erlang:start_timer(Interval, self(), retry)};
ensure_retry_timer(_Interval, State) -> do_ensure_retry_timer(_Interval, State) ->
State. State.
retry_send(State = #state{inflight = Inflight}) -> retry_send(State = #state{inflight = Inflight}) ->
@ -1243,7 +1269,7 @@ retry_send([{Type, Msg, Ts} | Msgs], Now, State = #state{retry_interval = Interv
{ok, NewState} -> retry_send(Msgs, Now, NewState); {ok, NewState} -> retry_send(Msgs, Now, NewState);
{error, Error} -> {stop, Error} {error, Error} -> {stop, Error}
end; end;
false -> {keep_state, ensure_retry_timer(Interval - Diff, State)} false -> {keep_state, do_ensure_retry_timer(Interval - Diff, State)}
end. end.
retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId}, retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId},

View File

@ -28,13 +28,8 @@ send_and_ack_test() ->
fun(Pid) -> Pid ! stop end), fun(Pid) -> Pid ! stop end),
meck:expect(emqx_client, publish, 2, meck:expect(emqx_client, publish, 2,
fun(Client, Msg) -> fun(Client, Msg) ->
case rand:uniform(200) of Client ! {publish, Msg},
1 -> {ok, Msg} %% as packet id
{error, {dummy, inflight_full}};
_ ->
Client ! {publish, Msg},
{ok, Msg} %% as packet id
end
end), end),
try try
Max = 100, Max = 100,

View File

@ -47,12 +47,14 @@ timer_cancel_flush_test() ->
end. end.
shutdown_disabled_test() -> shutdown_disabled_test() ->
ok = drain(),
self() ! foo, self() ! foo,
?assertEqual(continue, conn_proc_mng_policy(0)), ?assertEqual(continue, conn_proc_mng_policy(0)),
receive foo -> ok end, receive foo -> ok end,
?assertEqual(hibernate, conn_proc_mng_policy(0)). ?assertEqual(hibernate, conn_proc_mng_policy(0)).
message_queue_too_long_test() -> message_queue_too_long_test() ->
ok = drain(),
self() ! foo, self() ! foo,
self() ! bar, self() ! bar,
?assertEqual({shutdown, message_queue_too_long}, ?assertEqual({shutdown, message_queue_too_long},
@ -63,3 +65,18 @@ message_queue_too_long_test() ->
conn_proc_mng_policy(L) -> conn_proc_mng_policy(L) ->
emqx_misc:conn_proc_mng_policy(#{message_queue_len => L}). emqx_misc:conn_proc_mng_policy(#{message_queue_len => L}).
%% drain self() msg queue for deterministic test behavior
drain() ->
_ = drain([]), % maybe log
ok.
drain(Acc) ->
receive
Msg ->
drain([Msg | Acc])
after
0 ->
lists:reverse(Acc)
end.