Deleted batch publish support in emqx_portal_client

eqmx_portal_mqtt has to do single message publish calls for now
Also fix a bug in emqx_portal_mqtt ack collector
This commit is contained in:
spring2maz 2019-02-20 12:10:47 +01:00 committed by Gilbert Wong
parent 086a1d56b9
commit 1626cade28
6 changed files with 58 additions and 106 deletions

View File

@ -39,8 +39,6 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message
CT_SUITES = emqx_portal
CT_NODE_NAME = emqxct@127.0.0.1 CT_NODE_NAME = emqxct@127.0.0.1
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)

View File

@ -35,7 +35,6 @@
-export([pubcomp/2, pubcomp/3, pubcomp/4]). -export([pubcomp/2, pubcomp/3, pubcomp/4]).
-export([subscriptions/1]). -export([subscriptions/1]).
-export([info/1, stop/1]). -export([info/1, stop/1]).
-export([next_packet_id/1, next_packet_id/2]).
%% For test cases %% For test cases
-export([pause/1, resume/1]). -export([pause/1, resume/1]).
@ -390,7 +389,7 @@ publish(Client, Topic, Properties, Payload, Opts)
props = Properties, props = Properties,
payload = iolist_to_binary(Payload)}). payload = iolist_to_binary(Payload)}).
-spec(publish(client(), #mqtt_msg{} | [#mqtt_msg{}]) -> ok | {ok, packet_id()} | {error, term()}). -spec(publish(client(), #mqtt_msg{}) -> ok | {ok, packet_id()} | {error, term()}).
publish(Client, Msg) -> publish(Client, Msg) ->
gen_statem:call(Client, {publish, Msg}). gen_statem:call(Client, {publish, Msg}).
@ -422,19 +421,6 @@ disconnect(Client, ReasonCode) ->
disconnect(Client, ReasonCode, Properties) -> disconnect(Client, ReasonCode, Properties) ->
gen_statem:call(Client, {disconnect, ReasonCode, Properties}). gen_statem:call(Client, {disconnect, ReasonCode, Properties}).
-spec next_packet_id(packet_id()) -> packet_id().
next_packet_id(?MAX_PACKET_ID) -> 1;
next_packet_id(Id) -> Id + 1.
-spec next_packet_id(packet_id(), integer()) -> packet_id().
next_packet_id(Id, Bump) ->
true = (Bump < ?MAX_PACKET_ID div 2), %% assert
N = Id + Bump,
case N > ?MAX_PACKET_ID of
true -> N - ?MAX_PACKET_ID;
false -> N
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% For test cases %% For test cases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -801,26 +787,23 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = ?QOS_0}}, State) ->
{stop_and_reply, Reason, [{reply, From, Error}]} {stop_and_reply, Reason, [{reply, From, Error}]}
end; end;
connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}}, State) connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}},
when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> State = #state{inflight = Inflight, last_packet_id = PacketId})
connected({call, From}, {publish, [Msg]}, State); when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
case emqx_inflight:is_full(Inflight) of
%% when publishing a batch, {ok, BasePacketId} is returned, true ->
%% following packet ids for the batch tail are mod (1 bsl 16) consecutive {keep_state, State, [{reply, From, {error, {PacketId, inflight_full}}}]};
connected({call, From}, {publish, Msgs}, false ->
State = #state{inflight = Inflight, last_packet_id = PacketId}) when is_list(Msgs) -> Msg1 = Msg#mqtt_msg{packet_id = PacketId},
%% NOTE: to ensure API call atomicity, inflight buffer may overflow case send(Msg1, State) of
case emqx_inflight:is_full(Inflight) of {ok, NewState} ->
true -> Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight),
{keep_state, State, [{reply, From, {error, inflight_full}}]}; {keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}),
false -> [{reply, From, {ok, PacketId}}]};
case send_batch(assign_packet_id(Msgs, PacketId), State) of {error, Reason} ->
{ok, NewState} -> {stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]}
{keep_state, ensure_retry_timer(NewState), [{reply, From, {ok, PacketId}}]}; end
{error, Reason} -> end;
{stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]}
end
end;
connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics}, connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics},
State = #state{last_packet_id = PacketId}) -> State = #state{last_packet_id = PacketId}) ->
@ -1349,24 +1332,13 @@ send_puback(Packet, State) ->
{error, Reason} -> {stop, {shutdown, Reason}} {error, Reason} -> {stop, {shutdown, Reason}}
end. end.
send_batch([], State) -> {ok, State};
send_batch([Msg = #mqtt_msg{packet_id = PacketId} | Rest],
State = #state{inflight = Inflight}) ->
case send(Msg, State) of
{ok, NewState} ->
Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg, os:timestamp()}, Inflight),
send_batch(Rest, NewState#state{inflight = Inflight1});
{error, Reason} ->
{error, Reason}
end.
send(Msg, State) when is_record(Msg, mqtt_msg) -> send(Msg, State) when is_record(Msg, mqtt_msg) ->
send(msg_to_packet(Msg), State); send(msg_to_packet(Msg), State);
send(Packet, State = #state{socket = Sock, proto_ver = Ver}) send(Packet, State = #state{socket = Sock, proto_ver = Ver})
when is_record(Packet, mqtt_packet) -> when is_record(Packet, mqtt_packet) ->
Data = emqx_frame:serialize(Packet, #{version => Ver}), Data = emqx_frame:serialize(Packet, #{version => Ver}),
emqx_logger:debug("SEND Data: ~p", [Data]), emqx_logger:debug("SEND Data: ~1000p", [Packet]),
case emqx_client_sock:send(Sock, Data) of case emqx_client_sock:send(Sock, Data) of
ok -> {ok, bump_last_packet_id(State)}; ok -> {ok, bump_last_packet_id(State)};
Error -> Error Error -> Error
@ -1402,15 +1374,12 @@ next_events(Packets) ->
[{next_event, cast, Packet} || Packet <- lists:reverse(Packets)]. [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)].
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% packet_id generation and assignment %% packet_id generation
assign_packet_id(Msg = #mqtt_msg{}, Id) ->
Msg#mqtt_msg{packet_id = Id};
assign_packet_id([H | T], Id) ->
[assign_packet_id(H, Id) | assign_packet_id(T, next_packet_id(Id))];
assign_packet_id([], _Id) ->
[].
bump_last_packet_id(State = #state{last_packet_id = Id}) -> bump_last_packet_id(State = #state{last_packet_id = Id}) ->
State#state{last_packet_id = next_packet_id(Id)}. State#state{last_packet_id = next_packet_id(Id)}.
-spec next_packet_id(packet_id()) -> packet_id().
next_packet_id(?MAX_PACKET_ID) -> 1;
next_packet_id(Id) -> Id + 1.

View File

@ -34,7 +34,8 @@
%% Messages towards ack collector process %% Messages towards ack collector process
-define(RANGE(Min, Max), {Min, Max}). -define(RANGE(Min, Max), {Min, Max}).
-define(SENT(PktIdRange), {sent, PktIdRange}). -define(REF_IDS(Ref, Ids), {Ref, Ids}).
-define(SENT(RefIds), {sent, RefIds}).
-define(ACKED(AnyPktId), {acked, AnyPktId}). -define(ACKED(AnyPktId), {acked, AnyPktId}).
-define(STOP(Ref), {stop, Ref}). -define(STOP(Ref), {stop, Ref}).
@ -49,8 +50,6 @@ start(Config) ->
{ok, _} -> {ok, _} ->
try try
subscribe_remote_topics(Pid, maps:get(subscriptions, Config, [])), subscribe_remote_topics(Pid, maps:get(subscriptions, Config, [])),
%% ack collector is always a new pid every reconnect.
%% use it as a connection reference
{ok, Ref, #{ack_collector => AckCollector, {ok, Ref, #{ack_collector => AckCollector,
client_pid => Pid}} client_pid => Pid}}
catch catch
@ -100,16 +99,21 @@ safe_stop(Pid, StopF, Timeout) ->
exit(Pid, kill) exit(Pid, kill)
end. end.
send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, Batch) -> send(Conn, Batch) ->
case emqx_client:publish(ClientPid, Batch) of send(Conn, Batch, []).
{ok, BasePktId} ->
LastPktId = emqx_client:next_packet_id(BasePktId, length(Batch) - 1), send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest] = Batch, Acc) ->
AckCollector ! ?SENT(?RANGE(BasePktId, LastPktId)), case emqx_client:publish(ClientPid, Msg) of
%% return last pakcet id as batch reference {ok, PktId} when Rest =:= [] ->
{ok, LastPktId}; %% last one sent
Ref = make_ref(),
AckCollector ! ?SENT(?REF_IDS(Ref, lists:reverse([PktId | Acc]))),
{ok, Ref};
{ok, PktId} ->
send(Conn, Rest, [PktId | Acc]);
{error, {_PacketId, inflight_full}} -> {error, {_PacketId, inflight_full}} ->
timer:sleep(100), timer:sleep(100),
send(Conn, Batch); send(Conn, Batch, Acc);
{error, Reason} -> {error, Reason} ->
%% NOTE: There is no partial sucess of a batch and recover from the middle %% NOTE: There is no partial sucess of a batch and recover from the middle
%% only to retry all messages in one batch %% only to retry all messages in one batch
@ -126,9 +130,9 @@ ack_collector(Parent, ConnRef, Acked, Sent) ->
exit(normal); exit(normal);
?ACKED(PktId) -> ?ACKED(PktId) ->
match_acks(Parent, queue:in(PktId, Acked), Sent); match_acks(Parent, queue:in(PktId, Acked), Sent);
?SENT(Range) -> ?SENT(RefIds) ->
%% this message only happens per-batch, hence ++ is ok %% this message only happens per-batch, hence ++ is ok
match_acks(Parent, Acked, Sent ++ [Range]) match_acks(Parent, Acked, Sent ++ [RefIds])
after after
200 -> 200 ->
{Acked, Sent} {Acked, Sent}
@ -140,12 +144,14 @@ match_acks(Parent, Acked, Sent) ->
match_acks_1(Parent, queue:out(Acked), Sent). match_acks_1(Parent, queue:out(Acked), Sent).
match_acks_1(_Parent, {empty, Empty}, Sent) -> {Empty, Sent}; match_acks_1(_Parent, {empty, Empty}, Sent) -> {Empty, Sent};
match_acks_1(Parent, {{value, PktId}, Acked}, [?RANGE(PktId, PktId) | Sent]) -> match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId]) | Sent]) ->
%% batch finished %% batch finished
ok = emqx_portal:handle_ack(Parent, PktId), ok = emqx_portal:handle_ack(Parent, Ref),
match_acks(Parent, Acked, Sent); match_acks(Parent, Acked, Sent);
match_acks_1(Parent, {{value, PktId}, Acked}, [?RANGE(PktId, Max) | Sent]) -> match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId | RestIds]) | Sent]) ->
match_acks(Parent, Acked, [?RANGE(emqx_client:next_packet_id(PktId), Max) | Sent]). %% one message finished, but not the whole batch
match_acks(Parent, Acked, [?REF_IDS(Ref, RestIds) | Sent]).
%% When puback for QoS-1 message is received from remote MQTT broker %% When puback for QoS-1 message is received from remote MQTT broker
%% NOTE: no support for QoS-2 %% NOTE: no support for QoS-2

View File

@ -39,7 +39,7 @@ init_per_suite(Config) ->
_ -> _ ->
ok ok
end, end,
emqx_ct_broker_helpers:run_setup_steps([{log_leve, info} | Config]). emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]).
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps(). emqx_ct_broker_helpers:run_teardown_steps().

View File

@ -18,7 +18,6 @@
send_and_ack_test() -> send_and_ack_test() ->
%% delegate from gen_rpc to rpc for unit test %% delegate from gen_rpc to rpc for unit test
Tester = self(),
meck:new(emqx_client, [passthrough, no_history]), meck:new(emqx_client, [passthrough, no_history]),
meck:expect(emqx_client, start_link, 1, meck:expect(emqx_client, start_link, 1,
fun(#{msg_handler := Hdlr}) -> fun(#{msg_handler := Hdlr}) ->
@ -28,14 +27,13 @@ send_and_ack_test() ->
meck:expect(emqx_client, stop, 1, meck:expect(emqx_client, stop, 1,
fun(Pid) -> Pid ! stop end), fun(Pid) -> Pid ! stop end),
meck:expect(emqx_client, publish, 2, meck:expect(emqx_client, publish, 2,
fun(_Conn, Msgs) -> fun(Client, Msg) ->
case rand:uniform(100) of case rand:uniform(200) of
1 -> 1 ->
{error, {dummy, inflight_full}}; {error, {dummy, inflight_full}};
_ -> _ ->
BaseId = hd(Msgs), Client ! {publish, Msg},
Tester ! {published, Msgs}, {ok, Msg} %% as packet id
{ok, BaseId}
end end
end), end),
try try
@ -44,38 +42,19 @@ send_and_ack_test() ->
{ok, Ref, Conn} = emqx_portal_mqtt:start(#{}), {ok, Ref, Conn} = emqx_portal_mqtt:start(#{}),
%% return last packet id as batch reference %% return last packet id as batch reference
{ok, AckRef} = emqx_portal_mqtt:send(Conn, Batch), {ok, AckRef} = emqx_portal_mqtt:send(Conn, Batch),
%% as if the remote broker replied with puback
ok = fake_pubacks(Conn),
%% expect batch ack %% expect batch ack
AckRef1= receive {batch_ack, Id} -> Id end, receive {batch_ack, AckRef} -> ok end,
%% asset received ack matches the batch ref returned in send API
?assertEqual(AckRef, AckRef1),
ok = emqx_portal_mqtt:stop(Ref, Conn) ok = emqx_portal_mqtt:stop(Ref, Conn)
after after
meck:unload(emqx_client) meck:unload(emqx_client)
end. end.
fake_pubacks(#{client_pid := Client}) -> fake_client(#{puback := PubAckCallback} = Hdlr) ->
#{puback := PubAckCallback} = get_hdlr(Client),
receive receive
{published, Msgs} -> {publish, PktId} ->
lists:foreach( PubAckCallback(#{packet_id => PktId, reason_code => ?RC_SUCCESS}),
fun(Id) ->
PubAckCallback(#{packet_id => Id, reason_code => ?RC_SUCCESS})
end, Msgs)
end.
get_hdlr(Client) ->
Client ! {get_hdlr, self()},
receive {hdr, Hdlr} -> Hdlr end.
fake_client(Hdlr) ->
receive
{get_hdlr, Pid} ->
Pid ! {hdr, Hdlr},
fake_client(Hdlr); fake_client(Hdlr);
stop -> stop ->
exit(normal) exit(normal)
end. end.

View File

@ -71,7 +71,7 @@ disturbance_test() ->
%% buffer should continue taking in messages when disconnected %% buffer should continue taking in messages when disconnected
buffer_when_disconnected_test_() -> buffer_when_disconnected_test_() ->
{timeout, 5000, fun test_buffer_when_disconnected/0}. {timeout, 10000, fun test_buffer_when_disconnected/0}.
test_buffer_when_disconnected() -> test_buffer_when_disconnected() ->
Ref = make_ref(), Ref = make_ref(),
@ -92,7 +92,7 @@ test_buffer_when_disconnected() ->
Receiver ! {portal, Pid}, Receiver ! {portal, Pid},
?assertEqual(Pid, whereis(?PORTAL_REG_NAME)), ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)),
Pid ! {disconnected, Ref, test}, Pid ! {disconnected, Ref, test},
?WAIT({'DOWN', SenderMref, process, Sender, normal}, 2000), ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 5000),
?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000), ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000),
ok = emqx_portal:stop(?PORTAL_REG_NAME). ok = emqx_portal:stop(?PORTAL_REG_NAME).