Eliminate {error, inflight_full} from publish call to emqx_client
Prior to this change, if a emqx_client:publish/? caller sends in QoS=1/2 messages too fast, emqx_client may return `{error, inflight_full}` which could put put the caller to an awkward situation: there seem to be no ohter option except for putting self to a sleep-n-retry infinite loop. In this change, a new gen_statm state 'inflight_full' is introduced as a sub-state of 'connected'. When emqx_client is in 'inflight_full' state, it postpone all publish calls (for QoS=1/2) until inflight window size shrinks.
This commit is contained in:
parent
22aa7d4668
commit
a056a4cbde
|
@ -110,7 +110,7 @@ safe_stop(Pid, StopF, Timeout) ->
|
||||||
send(Conn, Batch) ->
|
send(Conn, Batch) ->
|
||||||
send(Conn, Batch, []).
|
send(Conn, Batch, []).
|
||||||
|
|
||||||
send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest] = Batch, Acc) ->
|
send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest], Acc) ->
|
||||||
case emqx_client:publish(ClientPid, Msg) of
|
case emqx_client:publish(ClientPid, Msg) of
|
||||||
{ok, PktId} when Rest =:= [] ->
|
{ok, PktId} when Rest =:= [] ->
|
||||||
%% last one sent
|
%% last one sent
|
||||||
|
@ -119,9 +119,6 @@ send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Re
|
||||||
{ok, Ref};
|
{ok, Ref};
|
||||||
{ok, PktId} ->
|
{ok, PktId} ->
|
||||||
send(Conn, Rest, [PktId | Acc]);
|
send(Conn, Rest, [PktId | Acc]);
|
||||||
{error, {_PacketId, inflight_full}} ->
|
|
||||||
timer:sleep(10),
|
|
||||||
send(Conn, Batch, Acc);
|
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
%% NOTE: There is no partial sucess of a batch and recover from the middle
|
%% NOTE: There is no partial sucess of a batch and recover from the middle
|
||||||
%% only to retry all messages in one batch
|
%% only to retry all messages in one batch
|
||||||
|
|
|
@ -38,7 +38,7 @@
|
||||||
%% For test cases
|
%% For test cases
|
||||||
-export([pause/1, resume/1]).
|
-export([pause/1, resume/1]).
|
||||||
|
|
||||||
-export([initialized/3, waiting_for_connack/3, connected/3]).
|
-export([initialized/3, waiting_for_connack/3, connected/3, inflight_full/3]).
|
||||||
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
|
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
|
||||||
|
|
||||||
-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0,
|
-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0,
|
||||||
|
@ -790,19 +790,18 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = ?QOS_0}}, State) ->
|
||||||
connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}},
|
connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}},
|
||||||
State = #state{inflight = Inflight, last_packet_id = PacketId})
|
State = #state{inflight = Inflight, last_packet_id = PacketId})
|
||||||
when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
|
when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
|
||||||
case emqx_inflight:is_full(Inflight) of
|
|
||||||
true ->
|
|
||||||
{keep_state, State, [{reply, From, {error, {PacketId, inflight_full}}}]};
|
|
||||||
false ->
|
|
||||||
Msg1 = Msg#mqtt_msg{packet_id = PacketId},
|
Msg1 = Msg#mqtt_msg{packet_id = PacketId},
|
||||||
case send(Msg1, State) of
|
case send(Msg1, State) of
|
||||||
{ok, NewState} ->
|
{ok, NewState} ->
|
||||||
Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight),
|
Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight),
|
||||||
{keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}),
|
State1 = ensure_retry_timer(NewState#state{inflight = Inflight1}),
|
||||||
[{reply, From, {ok, PacketId}}]};
|
Actions = [{reply, From, {ok, PacketId}}],
|
||||||
|
case emqx_inflight:is_full(Inflight1) of
|
||||||
|
true -> {next_state, inflight_full, State1, Actions};
|
||||||
|
false -> {keep_state, State1, Actions}
|
||||||
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]}
|
{stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]}
|
||||||
end
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics},
|
connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics},
|
||||||
|
@ -869,18 +868,8 @@ connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, Properties,
|
||||||
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
|
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
|
||||||
publish_process(?QOS_2, Packet, State);
|
publish_process(?QOS_2, Packet, State);
|
||||||
|
|
||||||
connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties),
|
connected(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) ->
|
||||||
State = #state{inflight = Inflight}) ->
|
{keep_state, delete_inflight(PubAck, State)};
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
||||||
{value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} ->
|
|
||||||
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
|
|
||||||
reason_code => ReasonCode,
|
|
||||||
properties => Properties}),
|
|
||||||
{keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}};
|
|
||||||
none ->
|
|
||||||
emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]),
|
|
||||||
{keep_state, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) ->
|
connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) ->
|
||||||
send_puback(?PUBREL_PACKET(PacketId),
|
send_puback(?PUBREL_PACKET(PacketId),
|
||||||
|
@ -911,18 +900,8 @@ connected(cast, ?PUBREL_PACKET(PacketId),
|
||||||
{keep_state, State}
|
{keep_state, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
connected(cast, ?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
|
connected(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) ->
|
||||||
State = #state{inflight = Inflight}) ->
|
{keep_state, delete_inflight(PubComp, State)};
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
||||||
{value, {pubrel, _PacketId, _Ts}} ->
|
|
||||||
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
|
|
||||||
reason_code => ReasonCode,
|
|
||||||
properties => Properties}),
|
|
||||||
{keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}};
|
|
||||||
none ->
|
|
||||||
emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]),
|
|
||||||
{keep_state, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes),
|
connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes),
|
||||||
State = #state{subscriptions = _Subscriptions}) ->
|
State = #state{subscriptions = _Subscriptions}) ->
|
||||||
|
@ -998,6 +977,17 @@ connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef,
|
||||||
connected(EventType, EventContent, Data) ->
|
connected(EventType, EventContent, Data) ->
|
||||||
handle_event(EventType, EventContent, connected, Data).
|
handle_event(EventType, EventContent, connected, Data).
|
||||||
|
|
||||||
|
inflight_full({call, _From}, {publish, #mqtt_msg{qos = QoS}}, _State) when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
|
||||||
|
{keep_state_and_data, [postpone]};
|
||||||
|
inflight_full(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) ->
|
||||||
|
delete_inflight_when_full(PubAck, State);
|
||||||
|
inflight_full(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) ->
|
||||||
|
delete_inflight_when_full(PubComp, State);
|
||||||
|
inflight_full(EventType, EventContent, Data) ->
|
||||||
|
%% inflight_full is a sub-state of connected state,
|
||||||
|
%% delegate all other events to connected state.
|
||||||
|
connected(EventType, EventContent, Data).
|
||||||
|
|
||||||
should_ping(Sock) ->
|
should_ping(Sock) ->
|
||||||
case emqx_client_sock:getstat(Sock, [send_oct]) of
|
case emqx_client_sock:getstat(Sock, [send_oct]) of
|
||||||
{ok, [{send_oct, Val}]} ->
|
{ok, [{send_oct, Val}]} ->
|
||||||
|
@ -1061,6 +1051,38 @@ code_change(_Vsn, State, Data, _Extra) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties),
|
||||||
|
State = #state{inflight = Inflight}) ->
|
||||||
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
|
{value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} ->
|
||||||
|
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
|
||||||
|
reason_code => ReasonCode,
|
||||||
|
properties => Properties}),
|
||||||
|
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
|
||||||
|
none ->
|
||||||
|
emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]),
|
||||||
|
State
|
||||||
|
end;
|
||||||
|
delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
|
||||||
|
State = #state{inflight = Inflight}) ->
|
||||||
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
|
{value, {pubrel, _PacketId, _Ts}} ->
|
||||||
|
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
|
||||||
|
reason_code => ReasonCode,
|
||||||
|
properties => Properties}),
|
||||||
|
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
|
||||||
|
none ->
|
||||||
|
emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]),
|
||||||
|
State
|
||||||
|
end.
|
||||||
|
|
||||||
|
delete_inflight_when_full(Packet, State0) ->
|
||||||
|
State = #state{inflight = Inflight} = delete_inflight(Packet, State0),
|
||||||
|
case emqx_inflight:is_full(Inflight) of
|
||||||
|
true -> {keep_state, State};
|
||||||
|
false -> {next_state, connected, State}
|
||||||
|
end.
|
||||||
|
|
||||||
%% Subscribe to response topic.
|
%% Subscribe to response topic.
|
||||||
-spec(sub_response_topic(client(), qos(), topic()) -> ok).
|
-spec(sub_response_topic(client(), qos(), topic()) -> ok).
|
||||||
sub_response_topic(Client, QoS, Topic) when is_binary(Topic) ->
|
sub_response_topic(Client, QoS, Topic) when is_binary(Topic) ->
|
||||||
|
@ -1222,11 +1244,12 @@ ensure_ack_timer(State = #state{ack_timer = undefined,
|
||||||
ensure_ack_timer(State) -> State.
|
ensure_ack_timer(State) -> State.
|
||||||
|
|
||||||
ensure_retry_timer(State = #state{retry_interval = Interval}) ->
|
ensure_retry_timer(State = #state{retry_interval = Interval}) ->
|
||||||
ensure_retry_timer(Interval, State).
|
do_ensure_retry_timer(Interval, State).
|
||||||
ensure_retry_timer(Interval, State = #state{retry_timer = undefined})
|
|
||||||
|
do_ensure_retry_timer(Interval, State = #state{retry_timer = undefined})
|
||||||
when Interval > 0 ->
|
when Interval > 0 ->
|
||||||
State#state{retry_timer = erlang:start_timer(Interval, self(), retry)};
|
State#state{retry_timer = erlang:start_timer(Interval, self(), retry)};
|
||||||
ensure_retry_timer(_Interval, State) ->
|
do_ensure_retry_timer(_Interval, State) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
retry_send(State = #state{inflight = Inflight}) ->
|
retry_send(State = #state{inflight = Inflight}) ->
|
||||||
|
@ -1243,7 +1266,7 @@ retry_send([{Type, Msg, Ts} | Msgs], Now, State = #state{retry_interval = Interv
|
||||||
{ok, NewState} -> retry_send(Msgs, Now, NewState);
|
{ok, NewState} -> retry_send(Msgs, Now, NewState);
|
||||||
{error, Error} -> {stop, Error}
|
{error, Error} -> {stop, Error}
|
||||||
end;
|
end;
|
||||||
false -> {keep_state, ensure_retry_timer(Interval - Diff, State)}
|
false -> {keep_state, do_ensure_retry_timer(Interval - Diff, State)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId},
|
retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId},
|
||||||
|
|
|
@ -28,13 +28,8 @@ send_and_ack_test() ->
|
||||||
fun(Pid) -> Pid ! stop end),
|
fun(Pid) -> Pid ! stop end),
|
||||||
meck:expect(emqx_client, publish, 2,
|
meck:expect(emqx_client, publish, 2,
|
||||||
fun(Client, Msg) ->
|
fun(Client, Msg) ->
|
||||||
case rand:uniform(200) of
|
|
||||||
1 ->
|
|
||||||
{error, {dummy, inflight_full}};
|
|
||||||
_ ->
|
|
||||||
Client ! {publish, Msg},
|
Client ! {publish, Msg},
|
||||||
{ok, Msg} %% as packet id
|
{ok, Msg} %% as packet id
|
||||||
end
|
|
||||||
end),
|
end),
|
||||||
try
|
try
|
||||||
Max = 100,
|
Max = 100,
|
||||||
|
|
Loading…
Reference in New Issue