Fix emqx_portal_mqtt_tests start function
This commit is contained in:
parent
d4495fd8e7
commit
921a45a505
|
@ -1382,4 +1382,3 @@ bump_last_packet_id(State = #state{last_packet_id = Id}) ->
|
||||||
-spec next_packet_id(packet_id()) -> packet_id().
|
-spec next_packet_id(packet_id()) -> packet_id().
|
||||||
next_packet_id(?MAX_PACKET_ID) -> 1;
|
next_packet_id(?MAX_PACKET_ID) -> 1;
|
||||||
next_packet_id(Id) -> Id + 1.
|
next_packet_id(Id) -> Id + 1.
|
||||||
|
|
||||||
|
|
|
@ -39,12 +39,20 @@
|
||||||
-define(ACKED(AnyPktId), {acked, AnyPktId}).
|
-define(ACKED(AnyPktId), {acked, AnyPktId}).
|
||||||
-define(STOP(Ref), {stop, Ref}).
|
-define(STOP(Ref), {stop, Ref}).
|
||||||
|
|
||||||
start(Config) ->
|
start(Config = #{address := Address}) ->
|
||||||
Ref = make_ref(),
|
Ref = make_ref(),
|
||||||
Parent = self(),
|
Parent = self(),
|
||||||
AckCollector = spawn_link(fun() -> ack_collector(Parent, Ref) end),
|
AckCollector = spawn_link(fun() -> ack_collector(Parent, Ref) end),
|
||||||
Handlers = make_hdlr(Parent, AckCollector, Ref),
|
Handlers = make_hdlr(Parent, AckCollector, Ref),
|
||||||
case emqx_client:start_link(Config#{msg_handler => Handlers, owner => AckCollector}) of
|
{Host, Port} = case string:tokens(Address, ":") of
|
||||||
|
[H] -> {H, 1883};
|
||||||
|
[H, P] -> {H, list_to_integer(P)}
|
||||||
|
end,
|
||||||
|
ClientConfig = Config#{msg_handler => Handlers,
|
||||||
|
owner => AckCollector,
|
||||||
|
host => Host,
|
||||||
|
port => Port},
|
||||||
|
case emqx_client:start_link(ClientConfig) of
|
||||||
{ok, Pid} ->
|
{ok, Pid} ->
|
||||||
case emqx_client:connect(Pid) of
|
case emqx_client:connect(Pid) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
|
@ -58,7 +66,7 @@ start(Config) ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
ok = stop(AckCollector, Pid),
|
ok = stop(Ref, #{ack_collector => AckCollector, client_pid => Pid}),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -178,4 +186,3 @@ subscribe_remote_topics(ClientPid, Subscriptions) ->
|
||||||
Error -> throw(Error)
|
Error -> throw(Error)
|
||||||
end
|
end
|
||||||
end, Subscriptions).
|
end, Subscriptions).
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ send_and_ack_test() ->
|
||||||
try
|
try
|
||||||
Max = 100,
|
Max = 100,
|
||||||
Batch = lists:seq(1, Max),
|
Batch = lists:seq(1, Max),
|
||||||
{ok, Ref, Conn} = emqx_portal_mqtt:start(#{}),
|
{ok, Ref, Conn} = emqx_portal_mqtt:start(#{address => "127.0.0.1:1883"}),
|
||||||
%% return last packet id as batch reference
|
%% return last packet id as batch reference
|
||||||
{ok, AckRef} = emqx_portal_mqtt:send(Conn, Batch),
|
{ok, AckRef} = emqx_portal_mqtt:send(Conn, Batch),
|
||||||
%% expect batch ack
|
%% expect batch ack
|
||||||
|
@ -57,4 +57,3 @@ fake_client(#{puback := PubAckCallback} = Hdlr) ->
|
||||||
stop ->
|
stop ->
|
||||||
exit(normal)
|
exit(normal)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue