diff --git a/src/emqx_bridge_mqtt.erl b/src/emqx_bridge_mqtt.erl index 590fbabb7..486f3206a 100644 --- a/src/emqx_bridge_mqtt.erl +++ b/src/emqx_bridge_mqtt.erl @@ -110,7 +110,7 @@ safe_stop(Pid, StopF, Timeout) -> send(Conn, Batch) -> send(Conn, Batch, []). -send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest] = Batch, Acc) -> +send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest], Acc) -> case emqx_client:publish(ClientPid, Msg) of {ok, PktId} when Rest =:= [] -> %% last one sent @@ -119,9 +119,6 @@ send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Re {ok, Ref}; {ok, PktId} -> send(Conn, Rest, [PktId | Acc]); - {error, {_PacketId, inflight_full}} -> - timer:sleep(10), - send(Conn, Batch, Acc); {error, Reason} -> %% NOTE: There is no partial sucess of a batch and recover from the middle %% only to retry all messages in one batch diff --git a/src/emqx_client.erl b/src/emqx_client.erl index e29e50552..2275164a9 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -38,7 +38,7 @@ %% For test cases -export([pause/1, resume/1]). --export([initialized/3, waiting_for_connack/3, connected/3]). +-export([initialized/3, waiting_for_connack/3, connected/3, inflight_full/3]). -export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). -export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, @@ -743,12 +743,12 @@ waiting_for_connack(EventType, EventContent, State) -> false -> {stop, connack_timeout} end. -connected({call, From}, subscriptions, State = #state{subscriptions = Subscriptions}) -> - {keep_state, State, [{reply, From, maps:to_list(Subscriptions)}]}; +connected({call, From}, subscriptions, #state{subscriptions = Subscriptions}) -> + {keep_state_and_data, [{reply, From, maps:to_list(Subscriptions)}]}; connected({call, From}, info, State) -> Info = lists:zip(record_info(fields, state), tl(tuple_to_list(State))), - {keep_state, State, [{reply, From, Info}]}; + {keep_state_and_data, [{reply, From, Info}]}; connected({call, From}, pause, State) -> {keep_state, State#state{paused = true}, [{reply, From, ok}]}; @@ -756,11 +756,11 @@ connected({call, From}, pause, State) -> connected({call, From}, resume, State) -> {keep_state, State#state{paused = false}, [{reply, From, ok}]}; -connected({call, From}, get_properties, State = #state{properties = Properties}) -> - {keep_state, State, [{reply, From, Properties}]}; +connected({call, From}, get_properties, #state{properties = Properties}) -> + {keep_state_and_data, [{reply, From, Properties}]}; -connected({call, From}, client_id, State = #state{client_id = ClientId}) -> - {keep_state, State, [{reply, From, ClientId}]}; +connected({call, From}, client_id, #state{client_id = ClientId}) -> + {keep_state_and_data, [{reply, From, ClientId}]}; connected({call, From}, {set_request_handler, RequestHandler}, State) -> {keep_state, State#state{request_handler = RequestHandler}, [{reply, From, ok}]}; @@ -790,20 +790,19 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = ?QOS_0}}, State) -> connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}}, State = #state{inflight = Inflight, last_packet_id = PacketId}) when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> - case emqx_inflight:is_full(Inflight) of - true -> - {keep_state, State, [{reply, From, {error, {PacketId, inflight_full}}}]}; - false -> - Msg1 = Msg#mqtt_msg{packet_id = PacketId}, - case send(Msg1, State) of - {ok, NewState} -> - Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight), - {keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}), - [{reply, From, {ok, PacketId}}]}; - {error, Reason} -> - {stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]} - end - end; + Msg1 = Msg#mqtt_msg{packet_id = PacketId}, + case send(Msg1, State) of + {ok, NewState} -> + Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight), + State1 = ensure_retry_timer(NewState#state{inflight = Inflight1}), + Actions = [{reply, From, {ok, PacketId}}], + case emqx_inflight:is_full(Inflight1) of + true -> {next_state, inflight_full, State1, Actions}; + false -> {keep_state, State1, Actions} + end; + {error, Reason} -> + {stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]} + end; connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics}, State = #state{last_packet_id = PacketId}) -> @@ -844,8 +843,8 @@ connected(cast, {pubrel, PacketId, ReasonCode, Properties}, State) -> connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) -> send_puback(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State); -connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), State = #state{paused = true}) -> - {keep_state, State}; +connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), #state{paused = true}) -> + keep_state_and_data; connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, Properties, Payload), State) when Properties =/= undefined -> @@ -869,18 +868,8 @@ connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, Properties, connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> publish_process(?QOS_2, Packet, State); -connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties), - State = #state{inflight = Inflight}) -> - case emqx_inflight:lookup(PacketId, Inflight) of - {value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} -> - ok = eval_msg_handler(State, puback, #{packet_id => PacketId, - reason_code => ReasonCode, - properties => Properties}), - {keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}}; - none -> - emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]), - {keep_state, State} - end; +connected(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) -> + {keep_state, delete_inflight(PubAck, State)}; connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) -> send_puback(?PUBREL_PACKET(PacketId), @@ -908,21 +897,11 @@ connected(cast, ?PUBREL_PACKET(PacketId), end; error -> emqx_logger:warning("Unexpected PUBREL: ~p", [PacketId]), - {keep_state, State} + keep_state_and_data end; -connected(cast, ?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), - State = #state{inflight = Inflight}) -> - case emqx_inflight:lookup(PacketId, Inflight) of - {value, {pubrel, _PacketId, _Ts}} -> - ok = eval_msg_handler(State, puback, #{packet_id => PacketId, - reason_code => ReasonCode, - properties => Properties}), - {keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}}; - none -> - emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]), - {keep_state, State} - end; +connected(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) -> + {keep_state, delete_inflight(PubComp, State)}; connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes), State = #state{subscriptions = _Subscriptions}) -> @@ -931,7 +910,8 @@ connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes), %%TODO: Merge reason codes to subscriptions? Reply = {ok, Properties, ReasonCodes}, {keep_state, NewState, [{reply, From, Reply}]}; - false -> {keep_state, State} + false -> + keep_state_and_data end; connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), @@ -944,16 +924,18 @@ connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), end, Subscriptions, Topics), {keep_state, NewState#state{subscriptions = Subscriptions1}, [{reply, From, {ok, Properties, ReasonCodes}}]}; - false -> {keep_state, State} + false -> + keep_state_and_data end; -connected(cast, ?PACKET(?PINGRESP), State = #state{pending_calls = []}) -> - {keep_state, State}; +connected(cast, ?PACKET(?PINGRESP), #state{pending_calls = []}) -> + keep_state_and_data; connected(cast, ?PACKET(?PINGRESP), State) -> case take_call(ping, State) of {value, #call{from = From}, NewState} -> {keep_state, NewState, [{reply, From, pong}]}; - false -> {keep_state, State} + false -> + keep_state_and_data end; connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) -> @@ -998,14 +980,16 @@ connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef, connected(EventType, EventContent, Data) -> handle_event(EventType, EventContent, connected, Data). -should_ping(Sock) -> - case emqx_client_sock:getstat(Sock, [send_oct]) of - {ok, [{send_oct, Val}]} -> - OldVal = get(send_oct), put(send_oct, Val), - OldVal == undefined orelse OldVal == Val; - Error = {error, _Reason} -> - Error - end. +inflight_full({call, _From}, {publish, #mqtt_msg{qos = QoS}}, _State) when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> + {keep_state_and_data, [postpone]}; +inflight_full(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) -> + delete_inflight_when_full(PubAck, State); +inflight_full(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) -> + delete_inflight_when_full(PubComp, State); +inflight_full(EventType, EventContent, Data) -> + %% inflight_full is a sub-state of connected state, + %% delegate all other events to connected state. + connected(EventType, EventContent, Data). handle_event({call, From}, stop, _StateName, _State) -> {stop_and_reply, normal, [{reply, From, ok}]}; @@ -1028,17 +1012,17 @@ handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) -> emqx_logger:debug("[~p] Got EXIT from owner, Reason: ~p", [?MODULE, Reason]), {stop, {shutdown, Reason}, State}; -handle_event(info, {inet_reply, _Sock, ok}, _, State) -> - {keep_state, State}; +handle_event(info, {inet_reply, _Sock, ok}, _, _State) -> + keep_state_and_data; handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> emqx_logger:error("[~p] got tcp error: ~p", [?MODULE, Reason]), {stop, {shutdown, Reason}, State}; -handle_event(EventType, EventContent, StateName, StateData) -> +handle_event(EventType, EventContent, StateName, _StateData) -> emqx_logger:error("State: ~s, Unexpected Event: (~p, ~p)", [StateName, EventType, EventContent]), - {keep_state, StateData}. + keep_state_and_data. %% Mandatory callback functions terminate(Reason, _StateName, State = #state{socket = Socket}) -> @@ -1061,6 +1045,47 @@ code_change(_Vsn, State, Data, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ +should_ping(Sock) -> + case emqx_client_sock:getstat(Sock, [send_oct]) of + {ok, [{send_oct, Val}]} -> + OldVal = get(send_oct), put(send_oct, Val), + OldVal == undefined orelse OldVal == Val; + Error = {error, _Reason} -> + Error + end. + +delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties), + State = #state{inflight = Inflight}) -> + case emqx_inflight:lookup(PacketId, Inflight) of + {value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} -> + ok = eval_msg_handler(State, puback, #{packet_id => PacketId, + reason_code => ReasonCode, + properties => Properties}), + State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; + none -> + emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]), + State + end; +delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), + State = #state{inflight = Inflight}) -> + case emqx_inflight:lookup(PacketId, Inflight) of + {value, {pubrel, _PacketId, _Ts}} -> + ok = eval_msg_handler(State, puback, #{packet_id => PacketId, + reason_code => ReasonCode, + properties => Properties}), + State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; + none -> + emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]), + State + end. + +delete_inflight_when_full(Packet, State0) -> + State = #state{inflight = Inflight} = delete_inflight(Packet, State0), + case emqx_inflight:is_full(Inflight) of + true -> {keep_state, State}; + false -> {next_state, connected, State} + end. + %% Subscribe to response topic. -spec(sub_response_topic(client(), qos(), topic()) -> ok). sub_response_topic(Client, QoS, Topic) when is_binary(Topic) -> @@ -1222,11 +1247,12 @@ ensure_ack_timer(State = #state{ack_timer = undefined, ensure_ack_timer(State) -> State. ensure_retry_timer(State = #state{retry_interval = Interval}) -> - ensure_retry_timer(Interval, State). -ensure_retry_timer(Interval, State = #state{retry_timer = undefined}) + do_ensure_retry_timer(Interval, State). + +do_ensure_retry_timer(Interval, State = #state{retry_timer = undefined}) when Interval > 0 -> State#state{retry_timer = erlang:start_timer(Interval, self(), retry)}; -ensure_retry_timer(_Interval, State) -> +do_ensure_retry_timer(_Interval, State) -> State. retry_send(State = #state{inflight = Inflight}) -> @@ -1243,7 +1269,7 @@ retry_send([{Type, Msg, Ts} | Msgs], Now, State = #state{retry_interval = Interv {ok, NewState} -> retry_send(Msgs, Now, NewState); {error, Error} -> {stop, Error} end; - false -> {keep_state, ensure_retry_timer(Interval - Diff, State)} + false -> {keep_state, do_ensure_retry_timer(Interval - Diff, State)} end. retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId}, diff --git a/test/emqx_bridge_mqtt_tests.erl b/test/emqx_bridge_mqtt_tests.erl index 7c094b957..e9eb0b0a0 100644 --- a/test/emqx_bridge_mqtt_tests.erl +++ b/test/emqx_bridge_mqtt_tests.erl @@ -28,13 +28,8 @@ send_and_ack_test() -> fun(Pid) -> Pid ! stop end), meck:expect(emqx_client, publish, 2, fun(Client, Msg) -> - case rand:uniform(200) of - 1 -> - {error, {dummy, inflight_full}}; - _ -> - Client ! {publish, Msg}, - {ok, Msg} %% as packet id - end + Client ! {publish, Msg}, + {ok, Msg} %% as packet id end), try Max = 100, diff --git a/test/emqx_misc_tests.erl b/test/emqx_misc_tests.erl index d0da45fba..038180b5b 100644 --- a/test/emqx_misc_tests.erl +++ b/test/emqx_misc_tests.erl @@ -47,12 +47,14 @@ timer_cancel_flush_test() -> end. shutdown_disabled_test() -> + ok = drain(), self() ! foo, ?assertEqual(continue, conn_proc_mng_policy(0)), receive foo -> ok end, ?assertEqual(hibernate, conn_proc_mng_policy(0)). message_queue_too_long_test() -> + ok = drain(), self() ! foo, self() ! bar, ?assertEqual({shutdown, message_queue_too_long}, @@ -63,3 +65,18 @@ message_queue_too_long_test() -> conn_proc_mng_policy(L) -> emqx_misc:conn_proc_mng_policy(#{message_queue_len => L}). + +%% drain self() msg queue for deterministic test behavior +drain() -> + _ = drain([]), % maybe log + ok. + +drain(Acc) -> + receive + Msg -> + drain([Msg | Acc]) + after + 0 -> + lists:reverse(Acc) + end. +