Relace emqx_client with emqtt
This commit is contained in:
parent
fb1b47de6e
commit
3f20bcf58f
1250
src/emqx_client.erl
1250
src/emqx_client.erl
File diff suppressed because it is too large
Load Diff
|
@ -1,110 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_client_sock).
|
||||
|
||||
-export([ connect/4
|
||||
, send/2
|
||||
, close/1
|
||||
]).
|
||||
|
||||
-export([ sockname/1
|
||||
, setopts/2
|
||||
, getstat/2
|
||||
]).
|
||||
|
||||
-export_type([socket/0, option/0]).
|
||||
|
||||
-record(ssl_socket, {tcp, ssl}).
|
||||
|
||||
-type(socket() :: inet:socket() | #ssl_socket{}).
|
||||
|
||||
-type(sockname() :: {inet:ip_address(), inet:port_number()}).
|
||||
|
||||
-type(option() :: gen_tcp:connect_option() | {ssl_opts, [ssl:ssl_option()]}).
|
||||
|
||||
-define(DEFAULT_TCP_OPTIONS, [binary, {packet, raw}, {active, false},
|
||||
{nodelay, true}, {reuseaddr, true}]).
|
||||
|
||||
-spec(connect(inet:ip_address() | inet:hostname(),
|
||||
inet:port_number(), [option()], timeout())
|
||||
-> {ok, socket()} | {error, term()}).
|
||||
connect(Host, Port, SockOpts, Timeout) ->
|
||||
TcpOpts = emqx_misc:merge_opts(?DEFAULT_TCP_OPTIONS,
|
||||
lists:keydelete(ssl_opts, 1, SockOpts)),
|
||||
case gen_tcp:connect(Host, Port, TcpOpts, Timeout) of
|
||||
{ok, Sock} ->
|
||||
case lists:keyfind(ssl_opts, 1, SockOpts) of
|
||||
{ssl_opts, SslOpts} ->
|
||||
ssl_upgrade(Sock, SslOpts, Timeout);
|
||||
false -> {ok, Sock}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
ssl_upgrade(Sock, SslOpts, Timeout) ->
|
||||
TlsVersions = proplists:get_value(versions, SslOpts, []),
|
||||
Ciphers = proplists:get_value(ciphers, SslOpts, default_ciphers(TlsVersions)),
|
||||
SslOpts2 = emqx_misc:merge_opts(SslOpts, [{ciphers, Ciphers}]),
|
||||
case ssl:connect(Sock, SslOpts2, Timeout) of
|
||||
{ok, SslSock} ->
|
||||
ok = ssl:controlling_process(SslSock, self()),
|
||||
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
|
||||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
-spec(send(socket(), iodata()) -> ok | {error, einval | closed}).
|
||||
send(Sock, Data) when is_port(Sock) ->
|
||||
try erlang:port_command(Sock, Data) of
|
||||
true -> ok
|
||||
catch
|
||||
error:badarg -> {error, einval}
|
||||
end;
|
||||
send(#ssl_socket{ssl = SslSock}, Data) ->
|
||||
ssl:send(SslSock, Data).
|
||||
|
||||
-spec(close(socket()) -> ok).
|
||||
close(Sock) when is_port(Sock) ->
|
||||
gen_tcp:close(Sock);
|
||||
close(#ssl_socket{ssl = SslSock}) ->
|
||||
ssl:close(SslSock).
|
||||
|
||||
-spec(setopts(socket(), [gen_tcp:option() | ssl:socketoption()]) -> ok).
|
||||
setopts(Sock, Opts) when is_port(Sock) ->
|
||||
inet:setopts(Sock, Opts);
|
||||
setopts(#ssl_socket{ssl = SslSock}, Opts) ->
|
||||
ssl:setopts(SslSock, Opts).
|
||||
|
||||
-spec(getstat(socket(), [atom()])
|
||||
-> {ok, [{atom(), integer()}]} | {error, term()}).
|
||||
getstat(Sock, Options) when is_port(Sock) ->
|
||||
inet:getstat(Sock, Options);
|
||||
getstat(#ssl_socket{tcp = Sock}, Options) ->
|
||||
inet:getstat(Sock, Options).
|
||||
|
||||
-spec(sockname(socket()) -> {ok, sockname()} | {error, term()}).
|
||||
sockname(Sock) when is_port(Sock) ->
|
||||
inet:sockname(Sock);
|
||||
sockname(#ssl_socket{ssl = SslSock}) ->
|
||||
ssl:sockname(SslSock).
|
||||
|
||||
default_ciphers(TlsVersions) ->
|
||||
lists:foldl(
|
||||
fun(TlsVer, Ciphers) ->
|
||||
Ciphers ++ ssl:cipher_suites(all, TlsVer)
|
||||
end, [], TlsVersions).
|
||||
|
|
@ -35,8 +35,8 @@ end_per_suite(_Config) ->
|
|||
t_emqx_pubsub_api(_) ->
|
||||
emqx:start(),
|
||||
true = emqx:is_running(node()),
|
||||
{ok, C} = emqx_client:start_link([{host, "localhost"}, {client_id, "myclient"}]),
|
||||
{ok, _} = emqx_client:connect(C),
|
||||
{ok, C} = emqtt:start_link([{host, "localhost"}, {client_id, "myclient"}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
ClientId = <<"myclient">>,
|
||||
Topic = <<"mytopic">>,
|
||||
Payload = <<"Hello World">>,
|
||||
|
|
|
@ -40,7 +40,7 @@ set_special_configs(_App) -> ok.
|
|||
t_alarm_handler(_) ->
|
||||
with_connection(
|
||||
fun(Sock) ->
|
||||
emqx_client_sock:send(Sock,
|
||||
emqtt_sock:send(Sock,
|
||||
raw_send_serialize(
|
||||
?CONNECT_PACKET(
|
||||
#mqtt_packet_connect{
|
||||
|
@ -52,7 +52,7 @@ t_alarm_handler(_) ->
|
|||
Topic1 = emqx_topic:systop(<<"alarms/alert">>),
|
||||
Topic2 = emqx_topic:systop(<<"alarms/clear">>),
|
||||
SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0},
|
||||
emqx_client_sock:send(Sock,
|
||||
emqtt_sock:send(Sock,
|
||||
raw_send_serialize(
|
||||
?SUBSCRIBE_PACKET(
|
||||
1,
|
||||
|
@ -86,13 +86,13 @@ t_alarm_handler(_) ->
|
|||
end).
|
||||
|
||||
with_connection(DoFun) ->
|
||||
{ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
|
||||
{ok, Sock} = emqtt_sock:connect({127, 0, 0, 1}, 1883,
|
||||
[binary, {packet, raw}, {active, false}],
|
||||
3000),
|
||||
try
|
||||
DoFun(Sock)
|
||||
after
|
||||
emqx_client_sock:close(Sock)
|
||||
emqtt_sock:close(Sock)
|
||||
end.
|
||||
|
||||
raw_send_serialize(Packet) ->
|
||||
|
@ -100,4 +100,3 @@ raw_send_serialize(Packet) ->
|
|||
|
||||
raw_recv_parse(Bin) ->
|
||||
emqx_frame:parse(Bin, emqx_frame:initial_parse_state(#{version => ?MQTT_PROTO_V5})).
|
||||
|
||||
|
|
|
@ -94,10 +94,10 @@ t_cm(_) ->
|
|||
IdleTimeout = emqx_zone:get_env(external, idle_timeout, 30000),
|
||||
emqx_zone:set_env(external, idle_timeout, 1000),
|
||||
ClientId = <<"myclient">>,
|
||||
{ok, C} = emqx_client:start_link([{client_id, ClientId}]),
|
||||
{ok, _} = emqx_client:connect(C),
|
||||
{ok, C} = emqtt:start_link([{client_id, ClientId}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
#{client := #{client_id := ClientId}} = emqx_cm:get_chan_attrs(ClientId),
|
||||
emqx_client:subscribe(C, <<"mytopic">>, 0),
|
||||
emqtt:subscribe(C, <<"mytopic">>, 0),
|
||||
ct:sleep(1200),
|
||||
Stats = emqx_cm:get_chan_stats(ClientId),
|
||||
?assertEqual(1, proplists:get_value(subscriptions, Stats)),
|
||||
|
@ -114,55 +114,55 @@ t_cm_registry(_) ->
|
|||
emqx_ct_helpers:start_apps([]).
|
||||
|
||||
t_will_message(_Config) ->
|
||||
{ok, C1} = emqx_client:start_link([{clean_start, true},
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true},
|
||||
{will_topic, nth(3, ?TOPICS)},
|
||||
{will_payload, <<"client disconnected">>},
|
||||
{keepalive, 1}]),
|
||||
{ok, _} = emqx_client:connect(C1),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
|
||||
{ok, C2} = emqx_client:start_link(),
|
||||
{ok, _} = emqx_client:connect(C2),
|
||||
{ok, C2} = emqtt:start_link(),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
|
||||
{ok, _, [2]} = emqx_client:subscribe(C2, nth(3, ?TOPICS), 2),
|
||||
{ok, _, [2]} = emqtt:subscribe(C2, nth(3, ?TOPICS), 2),
|
||||
timer:sleep(5),
|
||||
ok = emqx_client:stop(C1),
|
||||
ok = emqtt:stop(C1),
|
||||
timer:sleep(5),
|
||||
?assertEqual(1, length(recv_msgs(1))),
|
||||
ok = emqx_client:disconnect(C2),
|
||||
ok = emqtt:disconnect(C2),
|
||||
ct:pal("Will message test succeeded").
|
||||
|
||||
t_offline_message_queueing(_) ->
|
||||
{ok, C1} = emqx_client:start_link([{clean_start, false},
|
||||
{ok, C1} = emqtt:start_link([{clean_start, false},
|
||||
{client_id, <<"c1">>}]),
|
||||
{ok, _} = emqx_client:connect(C1),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
|
||||
{ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
|
||||
ok = emqx_client:disconnect(C1),
|
||||
{ok, C2} = emqx_client:start_link([{clean_start, true},
|
||||
{ok, _, [2]} = emqtt:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
|
||||
ok = emqtt:disconnect(C1),
|
||||
{ok, C2} = emqtt:start_link([{clean_start, true},
|
||||
{client_id, <<"c2">>}]),
|
||||
{ok, _} = emqx_client:connect(C2),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
|
||||
ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0),
|
||||
{ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1),
|
||||
{ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2),
|
||||
ok = emqtt:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0),
|
||||
{ok, _} = emqtt:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1),
|
||||
{ok, _} = emqtt:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2),
|
||||
timer:sleep(10),
|
||||
emqx_client:disconnect(C2),
|
||||
{ok, C3} = emqx_client:start_link([{clean_start, false},
|
||||
emqtt:disconnect(C2),
|
||||
{ok, C3} = emqtt:start_link([{clean_start, false},
|
||||
{client_id, <<"c1">>}]),
|
||||
{ok, _} = emqx_client:connect(C3),
|
||||
{ok, _} = emqtt:connect(C3),
|
||||
|
||||
timer:sleep(10),
|
||||
emqx_client:disconnect(C3),
|
||||
emqtt:disconnect(C3),
|
||||
?assertEqual(3, length(recv_msgs(3))).
|
||||
|
||||
t_overlapping_subscriptions(_) ->
|
||||
{ok, C} = emqx_client:start_link([]),
|
||||
{ok, _} = emqx_client:connect(C),
|
||||
{ok, C} = emqtt:start_link([]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
|
||||
{ok, _, [2, 1]} = emqx_client:subscribe(C, [{nth(7, ?WILD_TOPICS), 2},
|
||||
{ok, _, [2, 1]} = emqtt:subscribe(C, [{nth(7, ?WILD_TOPICS), 2},
|
||||
{nth(1, ?WILD_TOPICS), 1}]),
|
||||
timer:sleep(10),
|
||||
{ok, _} = emqx_client:publish(C, nth(4, ?TOPICS), <<"overlapping topic filters">>, 2),
|
||||
{ok, _} = emqtt:publish(C, nth(4, ?TOPICS), <<"overlapping topic filters">>, 2),
|
||||
timer:sleep(10),
|
||||
|
||||
Num = length(recv_msgs(2)),
|
||||
|
@ -176,67 +176,67 @@ t_overlapping_subscriptions(_) ->
|
|||
matching overlapping subscription.");
|
||||
true -> ok
|
||||
end,
|
||||
emqx_client:disconnect(C).
|
||||
emqtt:disconnect(C).
|
||||
|
||||
%% t_keepalive_test(_) ->
|
||||
%% ct:print("Keepalive test starting"),
|
||||
%% {ok, C1, _} = emqx_client:start_link([{clean_start, true},
|
||||
%% {ok, C1, _} = emqtt:start_link([{clean_start, true},
|
||||
%% {keepalive, 5},
|
||||
%% {will_flag, true},
|
||||
%% {will_topic, nth(5, ?TOPICS)},
|
||||
%% %% {will_qos, 2},
|
||||
%% {will_payload, <<"keepalive expiry">>}]),
|
||||
%% ok = emqx_client:pause(C1),
|
||||
%% {ok, C2, _} = emqx_client:start_link([{clean_start, true},
|
||||
%% ok = emqtt:pause(C1),
|
||||
%% {ok, C2, _} = emqtt:start_link([{clean_start, true},
|
||||
%% {keepalive, 0}]),
|
||||
%% {ok, _, [2]} = emqx_client:subscribe(C2, nth(5, ?TOPICS), 2),
|
||||
%% ok = emqx_client:disconnect(C2),
|
||||
%% {ok, _, [2]} = emqtt:subscribe(C2, nth(5, ?TOPICS), 2),
|
||||
%% ok = emqtt:disconnect(C2),
|
||||
%% ?assertEqual(1, length(recv_msgs(1))),
|
||||
%% ct:print("Keepalive test succeeded").
|
||||
|
||||
t_redelivery_on_reconnect(_) ->
|
||||
ct:pal("Redelivery on reconnect test starting"),
|
||||
{ok, C1} = emqx_client:start_link([{clean_start, false},
|
||||
{ok, C1} = emqtt:start_link([{clean_start, false},
|
||||
{client_id, <<"c">>}]),
|
||||
{ok, _} = emqx_client:connect(C1),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
|
||||
{ok, _, [2]} = emqx_client:subscribe(C1, nth(7, ?WILD_TOPICS), 2),
|
||||
{ok, _, [2]} = emqtt:subscribe(C1, nth(7, ?WILD_TOPICS), 2),
|
||||
timer:sleep(10),
|
||||
ok = emqx_client:pause(C1),
|
||||
{ok, _} = emqx_client:publish(C1, nth(2, ?TOPICS), <<>>,
|
||||
ok = emqtt:pause(C1),
|
||||
{ok, _} = emqtt:publish(C1, nth(2, ?TOPICS), <<>>,
|
||||
[{qos, 1}, {retain, false}]),
|
||||
{ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<>>,
|
||||
{ok, _} = emqtt:publish(C1, nth(4, ?TOPICS), <<>>,
|
||||
[{qos, 2}, {retain, false}]),
|
||||
timer:sleep(10),
|
||||
ok = emqx_client:disconnect(C1),
|
||||
ok = emqtt:disconnect(C1),
|
||||
?assertEqual(0, length(recv_msgs(2))),
|
||||
{ok, C2} = emqx_client:start_link([{clean_start, false},
|
||||
{ok, C2} = emqtt:start_link([{clean_start, false},
|
||||
{client_id, <<"c">>}]),
|
||||
{ok, _} = emqx_client:connect(C2),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
|
||||
timer:sleep(10),
|
||||
ok = emqx_client:disconnect(C2),
|
||||
ok = emqtt:disconnect(C2),
|
||||
?assertEqual(2, length(recv_msgs(2))).
|
||||
|
||||
%% t_subscribe_sys_topics(_) ->
|
||||
%% ct:print("Subscribe failure test starting"),
|
||||
%% {ok, C, _} = emqx_client:start_link([]),
|
||||
%% {ok, _, [2]} = emqx_client:subscribe(C, <<"$SYS/#">>, 2),
|
||||
%% {ok, C, _} = emqtt:start_link([]),
|
||||
%% {ok, _, [2]} = emqtt:subscribe(C, <<"$SYS/#">>, 2),
|
||||
%% timer:sleep(10),
|
||||
%% ct:print("Subscribe failure test succeeded").
|
||||
|
||||
t_dollar_topics(_) ->
|
||||
ct:pal("$ topics test starting"),
|
||||
{ok, C} = emqx_client:start_link([{clean_start, true},
|
||||
{ok, C} = emqtt:start_link([{clean_start, true},
|
||||
{keepalive, 0}]),
|
||||
{ok, _} = emqx_client:connect(C),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
|
||||
{ok, _, [1]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 1),
|
||||
{ok, _} = emqx_client:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>,
|
||||
{ok, _, [1]} = emqtt:subscribe(C, nth(6, ?WILD_TOPICS), 1),
|
||||
{ok, _} = emqtt:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>,
|
||||
<<"test">>, [{qos, 1}, {retain, false}]),
|
||||
timer:sleep(10),
|
||||
?assertEqual(0, length(recv_msgs(1))),
|
||||
ok = emqx_client:disconnect(C),
|
||||
ok = emqtt:disconnect(C),
|
||||
ct:pal("$ topics test succeeded").
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -254,15 +254,15 @@ t_basic_with_props_v5(_) ->
|
|||
|
||||
t_basic(Opts) ->
|
||||
Topic = nth(1, ?TOPICS),
|
||||
{ok, C} = emqx_client:start_link([{proto_ver, v4}]),
|
||||
{ok, _} = emqx_client:connect(C),
|
||||
{ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1),
|
||||
{ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2),
|
||||
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
|
||||
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
|
||||
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
|
||||
{ok, C} = emqtt:start_link([{proto_ver, v4}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
|
||||
{ok, _, [2]} = emqtt:subscribe(C, Topic, qos2),
|
||||
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
|
||||
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
|
||||
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
|
||||
?assertEqual(3, length(recv_msgs(3))),
|
||||
ok = emqx_client:disconnect(C).
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
|
|
|
@ -32,8 +32,8 @@ end_per_suite(_Config) ->
|
|||
%% t_flapping(_Config) ->
|
||||
%% process_flag(trap_exit, true),
|
||||
%% flapping_connect(5),
|
||||
%% {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]),
|
||||
%% {error, _} = emqx_client:connect(C),
|
||||
%% {ok, C} = emqtt:start_link([{client_id, <<"Client">>}]),
|
||||
%% {error, _} = emqtt:connect(C),
|
||||
%% receive
|
||||
%% {'EXIT', Client, _Reason} ->
|
||||
%% ct:log("receive exit signal, Client: ~p", [Client])
|
||||
|
@ -45,9 +45,9 @@ flapping_connect(Times) ->
|
|||
lists:foreach(fun do_connect/1, lists:seq(1, Times)).
|
||||
|
||||
do_connect(_I) ->
|
||||
{ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]),
|
||||
{ok, _} = emqx_client:connect(C),
|
||||
ok = emqx_client:disconnect(C).
|
||||
{ok, C} = emqtt:start_link([{client_id, <<"Client">>}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
prepare_for_test() ->
|
||||
ok = emqx_zone:set_env(external, enable_flapping_detect, true),
|
||||
|
|
|
@ -36,10 +36,10 @@ end_per_suite(_Config) ->
|
|||
|
||||
t_mod_subscription(_) ->
|
||||
emqx_mod_subscription:load([{<<"connected/%c/%u">>, ?QOS_0}]),
|
||||
{ok, C} = emqx_client:start_link([{host, "localhost"}, {client_id, "myclient"}, {username, "admin"}]),
|
||||
{ok, _} = emqx_client:connect(C),
|
||||
{ok, C} = emqtt:start_link([{host, "localhost"}, {client_id, "myclient"}, {username, "admin"}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
% ct:sleep(100),
|
||||
emqx_client:publish(C, <<"connected/myclient/admin">>, <<"Hello world">>, ?QOS_0),
|
||||
emqtt:publish(C, <<"connected/myclient/admin">>, <<"Hello world">>, ?QOS_0),
|
||||
receive
|
||||
{publish, #{topic := Topic, payload := Payload}} ->
|
||||
?assertEqual(<<"connected/myclient/admin">>, Topic),
|
||||
|
@ -47,5 +47,5 @@ t_mod_subscription(_) ->
|
|||
after 100 ->
|
||||
ct:fail("no_message")
|
||||
end,
|
||||
ok = emqx_client:disconnect(C),
|
||||
ok = emqtt:disconnect(C),
|
||||
emqx_mod_subscription:unload([]).
|
||||
|
|
|
@ -24,28 +24,28 @@
|
|||
-type topic() :: emqx_topic:topic().
|
||||
-type handler() :: fun((CorrData :: binary(), ReqPayload :: binary()) -> RspPayload :: binary()).
|
||||
|
||||
-spec start_link(topic(), qos(), handler(), emqx_client:options()) ->
|
||||
-spec start_link(topic(), qos(), handler(), emqtt:options()) ->
|
||||
{ok, pid()} | {error, any()}.
|
||||
start_link(RequestTopic, QoS, RequestHandler, Options0) ->
|
||||
Parent = self(),
|
||||
MsgHandler = make_msg_handler(RequestHandler, Parent),
|
||||
Options = [{msg_handler, MsgHandler} | Options0],
|
||||
case emqx_client:start_link(Options) of
|
||||
case emqtt:start_link(Options) of
|
||||
{ok, Pid} ->
|
||||
{ok, _} = emqx_client:connect(Pid),
|
||||
{ok, _} = emqtt:connect(Pid),
|
||||
try subscribe(Pid, RequestTopic, QoS) of
|
||||
ok -> {ok, Pid};
|
||||
{error, _} = Error -> Error
|
||||
catch
|
||||
C : E : S ->
|
||||
emqx_client:stop(Pid),
|
||||
emqtt:stop(Pid),
|
||||
{error, {C, E, S}}
|
||||
end;
|
||||
{error, _} = Error -> Error
|
||||
end.
|
||||
|
||||
stop(Pid) ->
|
||||
emqx_client:disconnect(Pid).
|
||||
emqtt:disconnect(Pid).
|
||||
|
||||
make_msg_handler(RequestHandler, Parent) ->
|
||||
#{publish => fun(Msg) -> handle_msg(Msg, RequestHandler, Parent) end,
|
||||
|
@ -75,11 +75,11 @@ handle_msg(ReqMsg, RequestHandler, Parent) ->
|
|||
end.
|
||||
|
||||
send_response(Msg) ->
|
||||
%% This function is evaluated by emqx_client itself.
|
||||
%% This function is evaluated by emqtt itself.
|
||||
%% hence delegate to another temp process for the loopback gen_statem call.
|
||||
Client = self(),
|
||||
_ = spawn_link(fun() ->
|
||||
case emqx_client:publish(Client, Msg) of
|
||||
case emqtt:publish(Client, Msg) of
|
||||
ok -> ok;
|
||||
{ok, _} -> ok;
|
||||
{error, Reason} -> exit({failed_to_publish_response, Reason})
|
||||
|
@ -89,6 +89,6 @@ send_response(Msg) ->
|
|||
|
||||
subscribe(Client, Topic, QoS) ->
|
||||
{ok, _Props, _QoS} =
|
||||
emqx_client:subscribe(Client, [{Topic, [{rh, 2}, {rap, false},
|
||||
emqtt:subscribe(Client, [{Topic, [{rh, 2}, {rap, false},
|
||||
{nl, true}, {qos, QoS}]}]),
|
||||
ok.
|
||||
|
|
|
@ -24,15 +24,15 @@ start_link(ResponseTopic, QoS, Options0) ->
|
|||
Parent = self(),
|
||||
MsgHandler = make_msg_handler(Parent),
|
||||
Options = [{msg_handler, MsgHandler} | Options0],
|
||||
case emqx_client:start_link(Options) of
|
||||
case emqtt:start_link(Options) of
|
||||
{ok, Pid} ->
|
||||
{ok, _} = emqx_client:connect(Pid),
|
||||
{ok, _} = emqtt:connect(Pid),
|
||||
try subscribe(Pid, ResponseTopic, QoS) of
|
||||
ok -> {ok, Pid};
|
||||
{error, _} = Error -> Error
|
||||
catch
|
||||
C : E : S ->
|
||||
emqx_client:stop(Pid),
|
||||
emqtt:stop(Pid),
|
||||
{error, {C, E, S}}
|
||||
end;
|
||||
{error, _} = Error -> Error
|
||||
|
@ -49,17 +49,17 @@ send(Client, ReqTopic, RspTopic, CorrData, Payload, QoS) ->
|
|||
props = Props,
|
||||
payload = Payload
|
||||
},
|
||||
case emqx_client:publish(Client, Msg) of
|
||||
case emqtt:publish(Client, Msg) of
|
||||
ok -> ok; %% QoS = 0
|
||||
{ok, _} -> ok;
|
||||
{error, _} = E -> E
|
||||
end.
|
||||
|
||||
stop(Pid) ->
|
||||
emqx_client:disconnect(Pid).
|
||||
emqtt:disconnect(Pid).
|
||||
|
||||
subscribe(Client, Topic, QoS) ->
|
||||
case emqx_client:subscribe(Client, Topic, QoS) of
|
||||
case emqtt:subscribe(Client, Topic, QoS) of
|
||||
{ok, _, _} -> ok;
|
||||
{error, _} = Error -> Error
|
||||
end.
|
||||
|
|
|
@ -150,24 +150,24 @@ t_not_so_sticky(_) ->
|
|||
ok = ensure_config(sticky),
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
{ok, C1} = emqx_client:start_link([{client_id, ClientId1}]),
|
||||
{ok, _} = emqx_client:connect(C1),
|
||||
{ok, C2} = emqx_client:start_link([{client_id, ClientId2}]),
|
||||
{ok, _} = emqx_client:connect(C2),
|
||||
{ok, C1} = emqtt:start_link([{client_id, ClientId1}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
{ok, C2} = emqtt:start_link([{client_id, ClientId2}]),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
|
||||
emqx_client:subscribe(C1, {<<"$share/group1/foo/bar">>, 0}),
|
||||
emqtt:subscribe(C1, {<<"$share/group1/foo/bar">>, 0}),
|
||||
timer:sleep(50),
|
||||
emqx_client:publish(C2, <<"foo/bar">>, <<"hello1">>),
|
||||
emqtt:publish(C2, <<"foo/bar">>, <<"hello1">>),
|
||||
?assertMatch([#{payload := <<"hello1">>}], recv_msgs(1)),
|
||||
|
||||
emqx_client:unsubscribe(C1, <<"$share/group1/foo/bar">>),
|
||||
emqtt:unsubscribe(C1, <<"$share/group1/foo/bar">>),
|
||||
timer:sleep(50),
|
||||
emqx_client:subscribe(C1, {<<"$share/group1/foo/#">>, 0}),
|
||||
emqtt:subscribe(C1, {<<"$share/group1/foo/#">>, 0}),
|
||||
timer:sleep(50),
|
||||
emqx_client:publish(C2, <<"foo/bar">>, <<"hello2">>),
|
||||
emqtt:publish(C2, <<"foo/bar">>, <<"hello2">>),
|
||||
?assertMatch([#{payload := <<"hello2">>}], recv_msgs(1)),
|
||||
emqx_client:disconnect(C1),
|
||||
emqx_client:disconnect(C2),
|
||||
emqtt:disconnect(C1),
|
||||
emqtt:disconnect(C2),
|
||||
ok.
|
||||
|
||||
test_two_messages(Strategy) ->
|
||||
|
@ -178,15 +178,15 @@ test_two_messages(Strategy, WithAck) ->
|
|||
Topic = <<"foo/bar">>,
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
{ok, ConnPid1} = emqx_client:start_link([{client_id, ClientId1}]),
|
||||
{ok, _} = emqx_client:connect(ConnPid1),
|
||||
{ok, ConnPid2} = emqx_client:start_link([{client_id, ClientId2}]),
|
||||
{ok, _} = emqx_client:connect(ConnPid2),
|
||||
{ok, ConnPid1} = emqtt:start_link([{client_id, ClientId1}]),
|
||||
{ok, _} = emqtt:connect(ConnPid1),
|
||||
{ok, ConnPid2} = emqtt:start_link([{client_id, ClientId2}]),
|
||||
{ok, _} = emqtt:connect(ConnPid2),
|
||||
|
||||
Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
|
||||
Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>),
|
||||
emqx_client:subscribe(ConnPid1, {<<"$share/group1/foo/bar">>, 0}),
|
||||
emqx_client:subscribe(ConnPid2, {<<"$share/group1/foo/bar">>, 0}),
|
||||
emqtt:subscribe(ConnPid1, {<<"$share/group1/foo/bar">>, 0}),
|
||||
emqtt:subscribe(ConnPid2, {<<"$share/group1/foo/bar">>, 0}),
|
||||
ct:sleep(100),
|
||||
emqx:publish(Message1),
|
||||
Me = self(),
|
||||
|
@ -210,8 +210,8 @@ test_two_messages(Strategy, WithAck) ->
|
|||
hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
|
||||
_ -> ok
|
||||
end,
|
||||
emqx_client:stop(ConnPid1),
|
||||
emqx_client:stop(ConnPid2),
|
||||
emqtt:stop(ConnPid1),
|
||||
emqtt:stop(ConnPid2),
|
||||
ok.
|
||||
|
||||
last_message(ExpectedPayload, Pids) ->
|
||||
|
|
|
@ -55,9 +55,9 @@ t_sys_mon(_Config) ->
|
|||
end, ?INPUTINFO).
|
||||
|
||||
validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) ->
|
||||
{ok, C} = emqx_client:start_link([{host, "localhost"}]),
|
||||
{ok, _} = emqx_client:connect(C),
|
||||
emqx_client:subscribe(C, emqx_topic:systop(lists:concat(['sysmon/', SysMonName])), qos1),
|
||||
{ok, C} = emqtt:start_link([{host, "localhost"}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
emqtt:subscribe(C, emqx_topic:systop(lists:concat(['sysmon/', SysMonName])), qos1),
|
||||
timer:sleep(100),
|
||||
?SYSMON ! {monitor, PidOrPort, SysMonName, InfoOrPort},
|
||||
receive
|
||||
|
@ -68,7 +68,7 @@ validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) ->
|
|||
1000 ->
|
||||
ct:fail("flase")
|
||||
end,
|
||||
emqx_client:stop(C).
|
||||
emqtt:stop(C).
|
||||
|
||||
concat_str(ValidateInfo, InfoOrPort, Info) ->
|
||||
WarnInfo = io_lib:format(ValidateInfo, [InfoOrPort, Info]),
|
||||
|
|
|
@ -33,11 +33,11 @@ end_per_suite(_Config) ->
|
|||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
t_start_traces(_Config) ->
|
||||
{ok, T} = emqx_client:start_link([{host, "localhost"},
|
||||
{ok, T} = emqtt:start_link([{host, "localhost"},
|
||||
{client_id, <<"client">>},
|
||||
{username, <<"testuser">>},
|
||||
{password, <<"pass">>}]),
|
||||
emqx_client:connect(T),
|
||||
emqtt:connect(T),
|
||||
|
||||
%% Start tracing
|
||||
emqx_logger:set_log_level(error),
|
||||
|
@ -63,7 +63,7 @@ t_start_traces(_Config) ->
|
|||
emqx_logger:set_log_level(debug),
|
||||
|
||||
%% Client with clientid = "client" publishes a "hi" message to "a/b/c".
|
||||
emqx_client:publish(T, <<"a/b/c">>, <<"hi">>),
|
||||
emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
|
||||
ct:sleep(200),
|
||||
|
||||
%% Verify messages are logged to "tmp/client.log" and "tmp/topic_trace.log", but not "tmp/client2.log".
|
||||
|
@ -75,6 +75,6 @@ t_start_traces(_Config) ->
|
|||
ok = emqx_tracer:stop_trace({client_id, <<"client">>}),
|
||||
ok = emqx_tracer:stop_trace({client_id, <<"client2">>}),
|
||||
ok = emqx_tracer:stop_trace({topic, <<"a/#">>}),
|
||||
emqx_client:disconnect(T),
|
||||
emqtt:disconnect(T),
|
||||
|
||||
emqx_logger:set_log_level(warning).
|
||||
|
|
|
@ -40,7 +40,7 @@ t_basic(_) ->
|
|||
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
|
||||
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
|
||||
?assertEqual(3, length(recv_msgs(3))),
|
||||
ok = emqx_client:disconnect(C).
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
recv_msgs(Count) ->
|
||||
recv_msgs(Count, []).
|
||||
|
|
Loading…
Reference in New Issue