1250 lines
52 KiB
Erlang
1250 lines
52 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_client).
|
|
|
|
-behaviour(gen_statem).
|
|
|
|
-include("logger.hrl").
|
|
-include("types.hrl").
|
|
-include("emqx_client.hrl").
|
|
|
|
-logger_header("[Client]").
|
|
|
|
-export([ start_link/0
|
|
, start_link/1
|
|
]).
|
|
|
|
-export([ connect/1
|
|
, disconnect/1
|
|
, disconnect/2
|
|
, disconnect/3
|
|
]).
|
|
|
|
-export([ping/1]).
|
|
|
|
%% PubSub
|
|
-export([ subscribe/2
|
|
, subscribe/3
|
|
, subscribe/4
|
|
, publish/2
|
|
, publish/3
|
|
, publish/4
|
|
, publish/5
|
|
, unsubscribe/2
|
|
, unsubscribe/3
|
|
]).
|
|
|
|
%% Puback...
|
|
-export([ puback/2
|
|
, puback/3
|
|
, puback/4
|
|
, pubrec/2
|
|
, pubrec/3
|
|
, pubrec/4
|
|
, pubrel/2
|
|
, pubrel/3
|
|
, pubrel/4
|
|
, pubcomp/2
|
|
, pubcomp/3
|
|
, pubcomp/4
|
|
]).
|
|
|
|
-export([subscriptions/1]).
|
|
|
|
-export([info/1, stop/1]).
|
|
|
|
%% For test cases
|
|
-export([pause/1, resume/1]).
|
|
|
|
-export([ initialized/3
|
|
, waiting_for_connack/3
|
|
, connected/3
|
|
, inflight_full/3
|
|
]).
|
|
|
|
-export([ init/1
|
|
, callback_mode/0
|
|
, handle_event/4
|
|
, terminate/3
|
|
, code_change/4
|
|
]).
|
|
|
|
-export_type([ host/0
|
|
, client/0
|
|
, option/0
|
|
, properties/0
|
|
, payload/0
|
|
, pubopt/0
|
|
, subopt/0
|
|
, mqtt_msg/0
|
|
]).
|
|
|
|
%% Default timeout
|
|
-define(DEFAULT_KEEPALIVE, 60).
|
|
-define(DEFAULT_ACK_TIMEOUT, 30000).
|
|
-define(DEFAULT_CONNECT_TIMEOUT, 60000).
|
|
|
|
-define(PROPERTY(Name, Val), #state{properties = #{Name := Val}}).
|
|
|
|
-define(WILL_MSG(QoS, Retain, Topic, Props, Payload),
|
|
#mqtt_msg{qos = QoS, retain = Retain, topic = Topic, props = Props, payload = Payload}).
|
|
|
|
-define(NO_CLIENT_ID, <<>>).
|
|
|
|
-type(host() :: inet:ip_address() | inet:hostname()).
|
|
|
|
%% Message handler is a set of callbacks defined to handle MQTT messages
|
|
%% as well as the disconnect event.
|
|
-define(NO_MSG_HDLR, undefined).
|
|
-type(msg_handler() :: #{puback := fun((_) -> any()),
|
|
publish := fun((emqx_types:message()) -> any()),
|
|
disconnected := fun(({reason_code(), _Properties :: term()}) -> any())
|
|
}).
|
|
|
|
-type(option() :: {name, atom()}
|
|
| {owner, pid()}
|
|
| {msg_handler, msg_handler()}
|
|
| {host, host()}
|
|
| {hosts, [{host(), inet:port_number()}]}
|
|
| {port, inet:port_number()}
|
|
| {tcp_opts, [gen_tcp:option()]}
|
|
| {ssl, boolean()}
|
|
| {ssl_opts, [ssl:ssl_option()]}
|
|
| {connect_timeout, pos_integer()}
|
|
| {bridge_mode, boolean()}
|
|
| {client_id, iodata()}
|
|
| {clean_start, boolean()}
|
|
| {username, iodata()}
|
|
| {password, iodata()}
|
|
| {proto_ver, v3 | v4 | v5}
|
|
| {keepalive, non_neg_integer()}
|
|
| {max_inflight, pos_integer()}
|
|
| {retry_interval, timeout()}
|
|
| {will_topic, iodata()}
|
|
| {will_payload, iodata()}
|
|
| {will_retain, boolean()}
|
|
| {will_qos, qos()}
|
|
| {will_props, properties()}
|
|
| {auto_ack, boolean()}
|
|
| {ack_timeout, pos_integer()}
|
|
| {force_ping, boolean()}
|
|
| {properties, properties()}).
|
|
|
|
-type(mqtt_msg() :: #mqtt_msg{}).
|
|
|
|
-record(state, {name :: atom(),
|
|
owner :: pid(),
|
|
msg_handler :: ?NO_MSG_HDLR | msg_handler(),
|
|
host :: host(),
|
|
port :: inet:port_number(),
|
|
hosts :: [{host(), inet:port_number()}],
|
|
socket :: inet:socket(),
|
|
sock_opts :: [emqx_client_sock:option()],
|
|
connect_timeout :: pos_integer(),
|
|
bridge_mode :: boolean(),
|
|
client_id :: binary(),
|
|
clean_start :: boolean(),
|
|
username :: maybe(binary()),
|
|
password :: maybe(binary()),
|
|
proto_ver :: emqx_mqtt_types:version(),
|
|
proto_name :: iodata(),
|
|
keepalive :: non_neg_integer(),
|
|
keepalive_timer :: maybe(reference()),
|
|
force_ping :: boolean(),
|
|
paused :: boolean(),
|
|
will_flag :: boolean(),
|
|
will_msg :: mqtt_msg(),
|
|
properties :: properties(),
|
|
pending_calls :: list(),
|
|
subscriptions :: map(),
|
|
max_inflight :: infinity | pos_integer(),
|
|
inflight :: emqx_inflight:inflight(),
|
|
awaiting_rel :: map(),
|
|
auto_ack :: boolean(),
|
|
ack_timeout :: pos_integer(),
|
|
ack_timer :: reference(),
|
|
retry_interval :: pos_integer(),
|
|
retry_timer :: reference(),
|
|
session_present :: boolean(),
|
|
last_packet_id :: packet_id(),
|
|
parse_state :: emqx_frame:state()
|
|
}).
|
|
|
|
-record(call, {id, from, req, ts}).
|
|
|
|
-type(client() :: pid() | atom()).
|
|
|
|
-type(topic() :: emqx_topic:topic()).
|
|
|
|
-type(payload() :: iodata()).
|
|
|
|
-type(packet_id() :: emqx_mqtt_types:packet_id()).
|
|
|
|
-type(properties() :: emqx_mqtt_types:properties()).
|
|
|
|
-type(qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos()).
|
|
|
|
-type(pubopt() :: {retain, boolean()} | {qos, qos()} | {timeout, timeout()}).
|
|
|
|
-type(subopt() :: {rh, 0 | 1 | 2}
|
|
| {rap, boolean()}
|
|
| {nl, boolean()}
|
|
| {qos, qos()}).
|
|
|
|
-type(reason_code() :: emqx_mqtt_types:reason_code()).
|
|
|
|
-type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% API
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(start_link() -> gen_statem:start_ret()).
|
|
start_link() -> start_link([]).
|
|
|
|
-spec(start_link(map() | [option()]) -> gen_statem:start_ret()).
|
|
start_link(Options) when is_map(Options) ->
|
|
start_link(maps:to_list(Options));
|
|
start_link(Options) when is_list(Options) ->
|
|
ok = emqx_mqtt_props:validate(
|
|
proplists:get_value(properties, Options, #{})),
|
|
case proplists:get_value(name, Options) of
|
|
undefined ->
|
|
gen_statem:start_link(?MODULE, [with_owner(Options)], []);
|
|
Name when is_atom(Name) ->
|
|
gen_statem:start_link({local, Name}, ?MODULE, [with_owner(Options)], [])
|
|
end.
|
|
|
|
with_owner(Options) ->
|
|
case proplists:get_value(owner, Options) of
|
|
Owner when is_pid(Owner) -> Options;
|
|
undefined -> [{owner, self()} | Options]
|
|
end.
|
|
|
|
-spec(connect(client()) -> {ok, properties()} | {error, term()}).
|
|
connect(Client) ->
|
|
gen_statem:call(Client, connect, infinity).
|
|
|
|
-spec(subscribe(client(), topic() | {topic(), qos() | [subopt()]} | [{topic(), qos()}])
|
|
-> subscribe_ret()).
|
|
subscribe(Client, Topic) when is_binary(Topic) ->
|
|
subscribe(Client, {Topic, ?QOS_0});
|
|
subscribe(Client, {Topic, QoS}) when is_binary(Topic), is_atom(QoS) ->
|
|
subscribe(Client, {Topic, ?QOS_I(QoS)});
|
|
subscribe(Client, {Topic, QoS}) when is_binary(Topic), ?IS_QOS(QoS) ->
|
|
subscribe(Client, [{Topic, ?QOS_I(QoS)}]);
|
|
subscribe(Client, Topics) when is_list(Topics) ->
|
|
subscribe(Client, #{}, lists:map(
|
|
fun({Topic, QoS}) when is_binary(Topic), is_atom(QoS) ->
|
|
{Topic, [{qos, ?QOS_I(QoS)}]};
|
|
({Topic, QoS}) when is_binary(Topic), ?IS_QOS(QoS) ->
|
|
{Topic, [{qos, ?QOS_I(QoS)}]};
|
|
({Topic, Opts}) when is_binary(Topic), is_list(Opts) ->
|
|
{Topic, Opts}
|
|
end, Topics)).
|
|
|
|
-spec(subscribe(client(), topic(), qos() | [subopt()]) ->
|
|
subscribe_ret();
|
|
(client(), properties(), [{topic(), qos() | [subopt()]}]) ->
|
|
subscribe_ret()).
|
|
subscribe(Client, Topic, QoS) when is_binary(Topic), is_atom(QoS) ->
|
|
subscribe(Client, Topic, ?QOS_I(QoS));
|
|
subscribe(Client, Topic, QoS) when is_binary(Topic), ?IS_QOS(QoS) ->
|
|
subscribe(Client, Topic, [{qos, QoS}]);
|
|
subscribe(Client, Topic, Opts) when is_binary(Topic), is_list(Opts) ->
|
|
subscribe(Client, #{}, [{Topic, Opts}]);
|
|
subscribe(Client, Properties, Topics) when is_map(Properties), is_list(Topics) ->
|
|
Topics1 = [{Topic, parse_subopt(Opts)} || {Topic, Opts} <- Topics],
|
|
gen_statem:call(Client, {subscribe, Properties, Topics1}).
|
|
|
|
-spec(subscribe(client(), properties(), topic(), qos() | [subopt()])
|
|
-> subscribe_ret()).
|
|
subscribe(Client, Properties, Topic, QoS)
|
|
when is_map(Properties), is_binary(Topic), is_atom(QoS) ->
|
|
subscribe(Client, Properties, Topic, ?QOS_I(QoS));
|
|
subscribe(Client, Properties, Topic, QoS)
|
|
when is_map(Properties), is_binary(Topic), ?IS_QOS(QoS) ->
|
|
subscribe(Client, Properties, Topic, [{qos, QoS}]);
|
|
subscribe(Client, Properties, Topic, Opts)
|
|
when is_map(Properties), is_binary(Topic), is_list(Opts) ->
|
|
subscribe(Client, Properties, [{Topic, Opts}]).
|
|
|
|
parse_subopt(Opts) ->
|
|
parse_subopt(Opts, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0}).
|
|
|
|
parse_subopt([], Result) ->
|
|
Result;
|
|
parse_subopt([{rh, I} | Opts], Result) when I >= 0, I =< 2 ->
|
|
parse_subopt(Opts, Result#{rh := I});
|
|
parse_subopt([{rap, true} | Opts], Result) ->
|
|
parse_subopt(Opts, Result#{rap := 1});
|
|
parse_subopt([{rap, false} | Opts], Result) ->
|
|
parse_subopt(Opts, Result#{rap := 0});
|
|
parse_subopt([{nl, true} | Opts], Result) ->
|
|
parse_subopt(Opts, Result#{nl := 1});
|
|
parse_subopt([{nl, false} | Opts], Result) ->
|
|
parse_subopt(Opts, Result#{nl := 0});
|
|
parse_subopt([{qos, QoS} | Opts], Result) ->
|
|
parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}).
|
|
|
|
-spec(publish(client(), topic(), payload()) -> ok | {error, term()}).
|
|
publish(Client, Topic, Payload) when is_binary(Topic) ->
|
|
publish(Client, #mqtt_msg{topic = Topic, qos = ?QOS_0, payload = iolist_to_binary(Payload)}).
|
|
|
|
-spec(publish(client(), topic(), payload(), qos() | [pubopt()])
|
|
-> ok | {ok, packet_id()} | {error, term()}).
|
|
publish(Client, Topic, Payload, QoS) when is_binary(Topic), is_atom(QoS) ->
|
|
publish(Client, Topic, Payload, [{qos, ?QOS_I(QoS)}]);
|
|
publish(Client, Topic, Payload, QoS) when is_binary(Topic), ?IS_QOS(QoS) ->
|
|
publish(Client, Topic, Payload, [{qos, QoS}]);
|
|
publish(Client, Topic, Payload, Opts) when is_binary(Topic), is_list(Opts) ->
|
|
publish(Client, Topic, #{}, Payload, Opts).
|
|
|
|
-spec(publish(client(), topic(), properties(), payload(), [pubopt()])
|
|
-> ok | {ok, packet_id()} | {error, term()}).
|
|
publish(Client, Topic, Properties, Payload, Opts)
|
|
when is_binary(Topic), is_map(Properties), is_list(Opts) ->
|
|
ok = emqx_mqtt_props:validate(Properties),
|
|
Retain = proplists:get_bool(retain, Opts),
|
|
QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)),
|
|
publish(Client, #mqtt_msg{qos = QoS,
|
|
retain = Retain,
|
|
topic = Topic,
|
|
props = Properties,
|
|
payload = iolist_to_binary(Payload)}).
|
|
|
|
-spec(publish(client(), #mqtt_msg{}) -> ok | {ok, packet_id()} | {error, term()}).
|
|
publish(Client, Msg) ->
|
|
gen_statem:call(Client, {publish, Msg}).
|
|
|
|
-spec(unsubscribe(client(), topic() | [topic()]) -> subscribe_ret()).
|
|
unsubscribe(Client, Topic) when is_binary(Topic) ->
|
|
unsubscribe(Client, [Topic]);
|
|
unsubscribe(Client, Topics) when is_list(Topics) ->
|
|
unsubscribe(Client, #{}, Topics).
|
|
|
|
-spec(unsubscribe(client(), properties(), topic() | [topic()]) -> subscribe_ret()).
|
|
unsubscribe(Client, Properties, Topic) when is_map(Properties), is_binary(Topic) ->
|
|
unsubscribe(Client, Properties, [Topic]);
|
|
unsubscribe(Client, Properties, Topics) when is_map(Properties), is_list(Topics) ->
|
|
gen_statem:call(Client, {unsubscribe, Properties, Topics}).
|
|
|
|
-spec(ping(client()) -> pong).
|
|
ping(Client) ->
|
|
gen_statem:call(Client, ping).
|
|
|
|
-spec(disconnect(client()) -> ok).
|
|
disconnect(Client) ->
|
|
disconnect(Client, ?RC_SUCCESS).
|
|
|
|
-spec(disconnect(client(), reason_code()) -> ok).
|
|
disconnect(Client, ReasonCode) ->
|
|
disconnect(Client, ReasonCode, #{}).
|
|
|
|
-spec(disconnect(client(), reason_code(), properties()) -> ok).
|
|
disconnect(Client, ReasonCode, Properties) ->
|
|
gen_statem:call(Client, {disconnect, ReasonCode, Properties}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% For test cases
|
|
%%--------------------------------------------------------------------
|
|
|
|
puback(Client, PacketId) when is_integer(PacketId) ->
|
|
puback(Client, PacketId, ?RC_SUCCESS).
|
|
puback(Client, PacketId, ReasonCode)
|
|
when is_integer(PacketId), is_integer(ReasonCode) ->
|
|
puback(Client, PacketId, ReasonCode, #{}).
|
|
puback(Client, PacketId, ReasonCode, Properties)
|
|
when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) ->
|
|
gen_statem:cast(Client, {puback, PacketId, ReasonCode, Properties}).
|
|
|
|
pubrec(Client, PacketId) when is_integer(PacketId) ->
|
|
pubrec(Client, PacketId, ?RC_SUCCESS).
|
|
pubrec(Client, PacketId, ReasonCode)
|
|
when is_integer(PacketId), is_integer(ReasonCode) ->
|
|
pubrec(Client, PacketId, ReasonCode, #{}).
|
|
pubrec(Client, PacketId, ReasonCode, Properties)
|
|
when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) ->
|
|
gen_statem:cast(Client, {pubrec, PacketId, ReasonCode, Properties}).
|
|
|
|
pubrel(Client, PacketId) when is_integer(PacketId) ->
|
|
pubrel(Client, PacketId, ?RC_SUCCESS).
|
|
pubrel(Client, PacketId, ReasonCode)
|
|
when is_integer(PacketId), is_integer(ReasonCode) ->
|
|
pubrel(Client, PacketId, ReasonCode, #{}).
|
|
pubrel(Client, PacketId, ReasonCode, Properties)
|
|
when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) ->
|
|
gen_statem:cast(Client, {pubrel, PacketId, ReasonCode, Properties}).
|
|
|
|
pubcomp(Client, PacketId) when is_integer(PacketId) ->
|
|
pubcomp(Client, PacketId, ?RC_SUCCESS).
|
|
pubcomp(Client, PacketId, ReasonCode)
|
|
when is_integer(PacketId), is_integer(ReasonCode) ->
|
|
pubcomp(Client, PacketId, ReasonCode, #{}).
|
|
pubcomp(Client, PacketId, ReasonCode, Properties)
|
|
when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) ->
|
|
gen_statem:cast(Client, {pubcomp, PacketId, ReasonCode, Properties}).
|
|
|
|
subscriptions(Client) ->
|
|
gen_statem:call(Client, subscriptions).
|
|
|
|
info(Client) ->
|
|
gen_statem:call(Client, info).
|
|
|
|
stop(Client) ->
|
|
gen_statem:call(Client, stop).
|
|
|
|
pause(Client) ->
|
|
gen_statem:call(Client, pause).
|
|
|
|
resume(Client) ->
|
|
gen_statem:call(Client, resume).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% gen_statem callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
init([Options]) ->
|
|
process_flag(trap_exit, true),
|
|
ClientId = case {proplists:get_value(proto_ver, Options, v4),
|
|
proplists:get_value(client_id, Options)} of
|
|
{v5, undefined} -> ?NO_CLIENT_ID;
|
|
{_ver, undefined} -> random_client_id();
|
|
{_ver, Id} -> iolist_to_binary(Id)
|
|
end,
|
|
State = init(Options, #state{host = {127,0,0,1},
|
|
port = 1883,
|
|
hosts = [],
|
|
sock_opts = [],
|
|
bridge_mode = false,
|
|
client_id = ClientId,
|
|
clean_start = true,
|
|
proto_ver = ?MQTT_PROTO_V4,
|
|
proto_name = <<"MQTT">>,
|
|
keepalive = ?DEFAULT_KEEPALIVE,
|
|
force_ping = false,
|
|
paused = false,
|
|
will_flag = false,
|
|
will_msg = #mqtt_msg{},
|
|
pending_calls = [],
|
|
subscriptions = #{},
|
|
max_inflight = infinity,
|
|
inflight = emqx_inflight:new(0),
|
|
awaiting_rel = #{},
|
|
properties = #{},
|
|
auto_ack = true,
|
|
ack_timeout = ?DEFAULT_ACK_TIMEOUT,
|
|
retry_interval = 0,
|
|
connect_timeout = ?DEFAULT_CONNECT_TIMEOUT,
|
|
last_packet_id = 1
|
|
}),
|
|
{ok, initialized, init_parse_state(State)}.
|
|
|
|
random_client_id() ->
|
|
rand:seed(exsplus, erlang:timestamp()),
|
|
I1 = rand:uniform(round(math:pow(2, 48))) - 1,
|
|
I2 = rand:uniform(round(math:pow(2, 32))) - 1,
|
|
{ok, Host} = inet:gethostname(),
|
|
iolist_to_binary(["emqx-client-", Host, "-", io_lib:format("~12.16.0b~8.16.0b", [I1, I2])]).
|
|
|
|
init([], State) ->
|
|
State;
|
|
init([{name, Name} | Opts], State) ->
|
|
init(Opts, State#state{name = Name});
|
|
init([{owner, Owner} | Opts], State) when is_pid(Owner) ->
|
|
link(Owner),
|
|
init(Opts, State#state{owner = Owner});
|
|
init([{msg_handler, Hdlr} | Opts], State) ->
|
|
init(Opts, State#state{msg_handler = Hdlr});
|
|
init([{host, Host} | Opts], State) ->
|
|
init(Opts, State#state{host = Host});
|
|
init([{port, Port} | Opts], State) ->
|
|
init(Opts, State#state{port = Port});
|
|
init([{hosts, Hosts} | Opts], State) ->
|
|
Hosts1 =
|
|
lists:foldl(fun({Host, Port}, Acc) ->
|
|
[{Host, Port}|Acc];
|
|
(Host, Acc) ->
|
|
[{Host, 1883}|Acc]
|
|
end, [], Hosts),
|
|
init(Opts, State#state{hosts = Hosts1});
|
|
init([{tcp_opts, TcpOpts} | Opts], State = #state{sock_opts = SockOpts}) ->
|
|
init(Opts, State#state{sock_opts = emqx_misc:merge_opts(SockOpts, TcpOpts)});
|
|
init([{ssl, EnableSsl} | Opts], State) ->
|
|
case lists:keytake(ssl_opts, 1, Opts) of
|
|
{value, SslOpts, WithOutSslOpts} ->
|
|
init([SslOpts, {ssl, EnableSsl}| WithOutSslOpts], State);
|
|
false ->
|
|
init([{ssl_opts, []}, {ssl, EnableSsl}| Opts], State)
|
|
end;
|
|
init([{ssl_opts, SslOpts} | Opts], State = #state{sock_opts = SockOpts}) ->
|
|
case lists:keytake(ssl, 1, Opts) of
|
|
{value, {ssl, true}, WithOutEnableSsl} ->
|
|
ok = ssl:start(),
|
|
SockOpts1 = emqx_misc:merge_opts(SockOpts, [{ssl_opts, SslOpts}]),
|
|
init(WithOutEnableSsl, State#state{sock_opts = SockOpts1});
|
|
{value, {ssl, false}, WithOutEnableSsl} ->
|
|
init(WithOutEnableSsl, State);
|
|
false ->
|
|
init(Opts, State)
|
|
end;
|
|
init([{client_id, ClientId} | Opts], State) ->
|
|
init(Opts, State#state{client_id = iolist_to_binary(ClientId)});
|
|
init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) ->
|
|
init(Opts, State#state{clean_start = CleanStart});
|
|
init([{username, Username} | Opts], State) ->
|
|
init(Opts, State#state{username = iolist_to_binary(Username)});
|
|
init([{password, Password} | Opts], State) ->
|
|
init(Opts, State#state{password = iolist_to_binary(Password)});
|
|
init([{keepalive, Secs} | Opts], State) ->
|
|
init(Opts, State#state{keepalive = Secs});
|
|
init([{proto_ver, v3} | Opts], State) ->
|
|
init(Opts, State#state{proto_ver = ?MQTT_PROTO_V3,
|
|
proto_name = <<"MQIsdp">>});
|
|
init([{proto_ver, v4} | Opts], State) ->
|
|
init(Opts, State#state{proto_ver = ?MQTT_PROTO_V4,
|
|
proto_name = <<"MQTT">>});
|
|
init([{proto_ver, v5} | Opts], State) ->
|
|
init(Opts, State#state{proto_ver = ?MQTT_PROTO_V5,
|
|
proto_name = <<"MQTT">>});
|
|
init([{will_topic, Topic} | Opts], State = #state{will_msg = WillMsg}) ->
|
|
WillMsg1 = init_will_msg({topic, Topic}, WillMsg),
|
|
init(Opts, State#state{will_flag = true, will_msg = WillMsg1});
|
|
init([{will_props, Properties} | Opts], State = #state{will_msg = WillMsg}) ->
|
|
init(Opts, State#state{will_msg = init_will_msg({props, Properties}, WillMsg)});
|
|
init([{will_payload, Payload} | Opts], State = #state{will_msg = WillMsg}) ->
|
|
init(Opts, State#state{will_msg = init_will_msg({payload, Payload}, WillMsg)});
|
|
init([{will_retain, Retain} | Opts], State = #state{will_msg = WillMsg}) ->
|
|
init(Opts, State#state{will_msg = init_will_msg({retain, Retain}, WillMsg)});
|
|
init([{will_qos, QoS} | Opts], State = #state{will_msg = WillMsg}) ->
|
|
init(Opts, State#state{will_msg = init_will_msg({qos, QoS}, WillMsg)});
|
|
init([{connect_timeout, Timeout}| Opts], State) ->
|
|
init(Opts, State#state{connect_timeout = timer:seconds(Timeout)});
|
|
init([{ack_timeout, Timeout}| Opts], State) ->
|
|
init(Opts, State#state{ack_timeout = timer:seconds(Timeout)});
|
|
init([force_ping | Opts], State) ->
|
|
init(Opts, State#state{force_ping = true});
|
|
init([{force_ping, ForcePing} | Opts], State) when is_boolean(ForcePing) ->
|
|
init(Opts, State#state{force_ping = ForcePing});
|
|
init([{properties, Properties} | Opts], State = #state{properties = InitProps}) ->
|
|
init(Opts, State#state{properties = maps:merge(InitProps, Properties)});
|
|
init([{max_inflight, infinity} | Opts], State) ->
|
|
init(Opts, State#state{max_inflight = infinity,
|
|
inflight = emqx_inflight:new(0)});
|
|
init([{max_inflight, I} | Opts], State) when is_integer(I) ->
|
|
init(Opts, State#state{max_inflight = I,
|
|
inflight = emqx_inflight:new(I)});
|
|
init([auto_ack | Opts], State) ->
|
|
init(Opts, State#state{auto_ack = true});
|
|
init([{auto_ack, AutoAck} | Opts], State) when is_boolean(AutoAck) ->
|
|
init(Opts, State#state{auto_ack = AutoAck});
|
|
init([{retry_interval, I} | Opts], State) ->
|
|
init(Opts, State#state{retry_interval = timer:seconds(I)});
|
|
init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) ->
|
|
init(Opts, State#state{bridge_mode = Mode});
|
|
init([_Opt | Opts], State) ->
|
|
init(Opts, State).
|
|
|
|
init_will_msg({topic, Topic}, WillMsg) ->
|
|
WillMsg#mqtt_msg{topic = iolist_to_binary(Topic)};
|
|
init_will_msg({props, Props}, WillMsg) ->
|
|
WillMsg#mqtt_msg{props = Props};
|
|
init_will_msg({payload, Payload}, WillMsg) ->
|
|
WillMsg#mqtt_msg{payload = iolist_to_binary(Payload)};
|
|
init_will_msg({retain, Retain}, WillMsg) when is_boolean(Retain) ->
|
|
WillMsg#mqtt_msg{retain = Retain};
|
|
init_will_msg({qos, QoS}, WillMsg) ->
|
|
WillMsg#mqtt_msg{qos = ?QOS_I(QoS)}.
|
|
|
|
init_parse_state(State = #state{proto_ver = Ver, properties = Properties}) ->
|
|
MaxSize = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE),
|
|
ParseState = emqx_frame:initial_parse_state(
|
|
#{max_size => MaxSize, version => Ver}),
|
|
State#state{parse_state = ParseState}.
|
|
|
|
callback_mode() -> state_functions.
|
|
|
|
initialized({call, From}, connect, State = #state{sock_opts = SockOpts,
|
|
connect_timeout = Timeout}) ->
|
|
case sock_connect(hosts(State), SockOpts, Timeout) of
|
|
{ok, Sock} ->
|
|
case mqtt_connect(run_sock(State#state{socket = Sock})) of
|
|
{ok, NewState} ->
|
|
{next_state, waiting_for_connack,
|
|
add_call(new_call(connect, From), NewState), [Timeout]};
|
|
Error = {error, Reason} ->
|
|
{stop_and_reply, Reason, [{reply, From, Error}]}
|
|
end;
|
|
Error = {error, Reason} ->
|
|
{stop_and_reply, Reason, [{reply, From, Error}]}
|
|
end;
|
|
|
|
initialized(EventType, EventContent, State) ->
|
|
handle_event(EventType, EventContent, initialized, State).
|
|
|
|
mqtt_connect(State = #state{client_id = ClientId,
|
|
clean_start = CleanStart,
|
|
bridge_mode = IsBridge,
|
|
username = Username,
|
|
password = Password,
|
|
proto_ver = ProtoVer,
|
|
proto_name = ProtoName,
|
|
keepalive = KeepAlive,
|
|
will_flag = WillFlag,
|
|
will_msg = WillMsg,
|
|
properties = Properties}) ->
|
|
?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg,
|
|
ConnProps = emqx_mqtt_props:filter(?CONNECT, Properties),
|
|
send(?CONNECT_PACKET(
|
|
#mqtt_packet_connect{proto_ver = ProtoVer,
|
|
proto_name = ProtoName,
|
|
is_bridge = IsBridge,
|
|
clean_start = CleanStart,
|
|
will_flag = WillFlag,
|
|
will_qos = WillQoS,
|
|
will_retain = WillRetain,
|
|
keepalive = KeepAlive,
|
|
properties = ConnProps,
|
|
client_id = ClientId,
|
|
will_props = WillProps,
|
|
will_topic = WillTopic,
|
|
will_payload = WillPayload,
|
|
username = Username,
|
|
password = Password}), State).
|
|
|
|
waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS,
|
|
SessPresent,
|
|
Properties),
|
|
State = #state{properties = AllProps,
|
|
client_id = ClientId}) ->
|
|
case take_call(connect, State) of
|
|
{value, #call{from = From}, State1} ->
|
|
AllProps1 = case Properties of
|
|
undefined -> AllProps;
|
|
_ -> maps:merge(AllProps, Properties)
|
|
end,
|
|
Reply = {ok, Properties},
|
|
State2 = State1#state{client_id = assign_id(ClientId, AllProps1),
|
|
properties = AllProps1,
|
|
session_present = SessPresent},
|
|
{next_state, connected, ensure_keepalive_timer(State2),
|
|
[{reply, From, Reply}]};
|
|
false ->
|
|
{stop, bad_connack}
|
|
end;
|
|
|
|
waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode,
|
|
_SessPresent,
|
|
Properties),
|
|
State = #state{proto_ver = ProtoVer}) ->
|
|
Reason = emqx_reason_codes:name(ReasonCode, ProtoVer),
|
|
case take_call(connect, State) of
|
|
{value, #call{from = From}, _State} ->
|
|
Reply = {error, {Reason, Properties}},
|
|
{stop_and_reply, {shutdown, Reason}, [{reply, From, Reply}]};
|
|
false -> {stop, connack_error}
|
|
end;
|
|
|
|
waiting_for_connack(timeout, _Timeout, State) ->
|
|
case take_call(connect, State) of
|
|
{value, #call{from = From}, _State} ->
|
|
Reply = {error, connack_timeout},
|
|
{stop_and_reply, connack_timeout, [{reply, From, Reply}]};
|
|
false -> {stop, connack_timeout}
|
|
end;
|
|
|
|
waiting_for_connack(EventType, EventContent, State) ->
|
|
case take_call(connect, State) of
|
|
{value, #call{from = From}, _State} ->
|
|
case handle_event(EventType, EventContent, waiting_for_connack, State) of
|
|
{stop, Reason, State} ->
|
|
Reply = {error, {Reason, EventContent}},
|
|
{stop_and_reply, Reason, [{reply, From, Reply}]};
|
|
StateCallbackResult ->
|
|
StateCallbackResult
|
|
end;
|
|
false -> {stop, connack_timeout}
|
|
end.
|
|
|
|
connected({call, From}, subscriptions, #state{subscriptions = Subscriptions}) ->
|
|
{keep_state_and_data, [{reply, From, maps:to_list(Subscriptions)}]};
|
|
|
|
connected({call, From}, info, State) ->
|
|
Info = lists:zip(record_info(fields, state), tl(tuple_to_list(State))),
|
|
{keep_state_and_data, [{reply, From, Info}]};
|
|
|
|
connected({call, From}, pause, State) ->
|
|
{keep_state, State#state{paused = true}, [{reply, From, ok}]};
|
|
|
|
connected({call, From}, resume, State) ->
|
|
{keep_state, State#state{paused = false}, [{reply, From, ok}]};
|
|
|
|
connected({call, From}, client_id, #state{client_id = ClientId}) ->
|
|
{keep_state_and_data, [{reply, From, ClientId}]};
|
|
|
|
connected({call, From}, SubReq = {subscribe, Properties, Topics},
|
|
State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) ->
|
|
case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of
|
|
{ok, NewState} ->
|
|
Call = new_call({subscribe, PacketId}, From, SubReq),
|
|
Subscriptions1 =
|
|
lists:foldl(fun({Topic, Opts}, Acc) ->
|
|
maps:put(Topic, Opts, Acc)
|
|
end, Subscriptions, Topics),
|
|
{keep_state, ensure_ack_timer(add_call(Call,NewState#state{subscriptions = Subscriptions1}))};
|
|
Error = {error, Reason} ->
|
|
{stop_and_reply, Reason, [{reply, From, Error}]}
|
|
end;
|
|
|
|
connected({call, From}, {publish, Msg = #mqtt_msg{qos = ?QOS_0}}, State) ->
|
|
case send(Msg, State) of
|
|
{ok, NewState} ->
|
|
{keep_state, NewState, [{reply, From, ok}]};
|
|
Error = {error, Reason} ->
|
|
{stop_and_reply, Reason, [{reply, From, Error}]}
|
|
end;
|
|
|
|
connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}},
|
|
State = #state{inflight = Inflight, last_packet_id = PacketId})
|
|
when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
|
|
Msg1 = Msg#mqtt_msg{packet_id = PacketId},
|
|
case send(Msg1, State) of
|
|
{ok, NewState} ->
|
|
Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight),
|
|
State1 = ensure_retry_timer(NewState#state{inflight = Inflight1}),
|
|
Actions = [{reply, From, {ok, PacketId}}],
|
|
case emqx_inflight:is_full(Inflight1) of
|
|
true -> {next_state, inflight_full, State1, Actions};
|
|
false -> {keep_state, State1, Actions}
|
|
end;
|
|
{error, Reason} ->
|
|
{stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]}
|
|
end;
|
|
|
|
connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics},
|
|
State = #state{last_packet_id = PacketId}) ->
|
|
case send(?UNSUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of
|
|
{ok, NewState} ->
|
|
Call = new_call({unsubscribe, PacketId}, From, UnsubReq),
|
|
{keep_state, ensure_ack_timer(add_call(Call, NewState))};
|
|
Error = {error, Reason} ->
|
|
{stop_and_reply, Reason, [{reply, From, Error}]}
|
|
end;
|
|
|
|
connected({call, From}, ping, State) ->
|
|
case send(?PACKET(?PINGREQ), State) of
|
|
{ok, NewState} ->
|
|
Call = new_call(ping, From),
|
|
{keep_state, ensure_ack_timer(add_call(Call, NewState))};
|
|
Error = {error, Reason} ->
|
|
{stop_and_reply, Reason, [{reply, From, Error}]}
|
|
end;
|
|
|
|
connected({call, From}, {disconnect, ReasonCode, Properties}, State) ->
|
|
case send(?DISCONNECT_PACKET(ReasonCode, Properties), State) of
|
|
{ok, NewState} ->
|
|
{stop_and_reply, normal, [{reply, From, ok}], NewState};
|
|
Error = {error, Reason} ->
|
|
{stop_and_reply, Reason, [{reply, From, Error}]}
|
|
end;
|
|
|
|
connected(cast, {puback, PacketId, ReasonCode, Properties}, State) ->
|
|
send_puback(?PUBACK_PACKET(PacketId, ReasonCode, Properties), State);
|
|
|
|
connected(cast, {pubrec, PacketId, ReasonCode, Properties}, State) ->
|
|
send_puback(?PUBREC_PACKET(PacketId, ReasonCode, Properties), State);
|
|
|
|
connected(cast, {pubrel, PacketId, ReasonCode, Properties}, State) ->
|
|
send_puback(?PUBREL_PACKET(PacketId, ReasonCode, Properties), State);
|
|
|
|
connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) ->
|
|
send_puback(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State);
|
|
|
|
connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), #state{paused = true}) ->
|
|
keep_state_and_data;
|
|
|
|
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) ->
|
|
{keep_state, deliver(packet_to_msg(Packet), State)};
|
|
|
|
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
|
|
publish_process(?QOS_1, Packet, State);
|
|
|
|
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
|
|
publish_process(?QOS_2, Packet, State);
|
|
|
|
connected(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) ->
|
|
{keep_state, delete_inflight(PubAck, State)};
|
|
|
|
connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) ->
|
|
send_puback(?PUBREL_PACKET(PacketId),
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
{value, {publish, _Msg, _Ts}} ->
|
|
Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight),
|
|
State#state{inflight = Inflight1};
|
|
{value, {pubrel, _Ref, _Ts}} ->
|
|
?LOG(notice, "Duplicated PUBREC Packet: ~p", [PacketId]),
|
|
State;
|
|
none ->
|
|
?LOG(warning, "Unexpected PUBREC Packet: ~p", [PacketId]),
|
|
State
|
|
end);
|
|
|
|
%%TODO::... if auto_ack is false, should we take PacketId from the map?
|
|
connected(cast, ?PUBREL_PACKET(PacketId),
|
|
State = #state{awaiting_rel = AwaitingRel, auto_ack = AutoAck}) ->
|
|
case maps:take(PacketId, AwaitingRel) of
|
|
{Packet, AwaitingRel1} ->
|
|
NewState = deliver(packet_to_msg(Packet), State#state{awaiting_rel = AwaitingRel1}),
|
|
case AutoAck of
|
|
true -> send_puback(?PUBCOMP_PACKET(PacketId), NewState);
|
|
false -> {keep_state, NewState}
|
|
end;
|
|
error ->
|
|
?LOG(warning, "Unexpected PUBREL: ~p", [PacketId]),
|
|
keep_state_and_data
|
|
end;
|
|
|
|
connected(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) ->
|
|
{keep_state, delete_inflight(PubComp, State)};
|
|
|
|
connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes),
|
|
State = #state{subscriptions = _Subscriptions}) ->
|
|
case take_call({subscribe, PacketId}, State) of
|
|
{value, #call{from = From}, NewState} ->
|
|
%%TODO: Merge reason codes to subscriptions?
|
|
Reply = {ok, Properties, ReasonCodes},
|
|
{keep_state, NewState, [{reply, From, Reply}]};
|
|
false ->
|
|
keep_state_and_data
|
|
end;
|
|
|
|
connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes),
|
|
State = #state{subscriptions = Subscriptions}) ->
|
|
case take_call({unsubscribe, PacketId}, State) of
|
|
{value, #call{from = From, req = {_, _, Topics}}, NewState} ->
|
|
Subscriptions1 =
|
|
lists:foldl(fun(Topic, Acc) ->
|
|
maps:remove(Topic, Acc)
|
|
end, Subscriptions, Topics),
|
|
{keep_state, NewState#state{subscriptions = Subscriptions1},
|
|
[{reply, From, {ok, Properties, ReasonCodes}}]};
|
|
false ->
|
|
keep_state_and_data
|
|
end;
|
|
|
|
connected(cast, ?PACKET(?PINGRESP), #state{pending_calls = []}) ->
|
|
keep_state_and_data;
|
|
connected(cast, ?PACKET(?PINGRESP), State) ->
|
|
case take_call(ping, State) of
|
|
{value, #call{from = From}, NewState} ->
|
|
{keep_state, NewState, [{reply, From, pong}]};
|
|
false ->
|
|
keep_state_and_data
|
|
end;
|
|
|
|
connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) ->
|
|
{stop, {disconnected, ReasonCode, Properties}, State};
|
|
|
|
connected(info, {timeout, _TRef, keepalive}, State = #state{force_ping = true}) ->
|
|
case send(?PACKET(?PINGREQ), State) of
|
|
{ok, NewState} ->
|
|
{keep_state, ensure_keepalive_timer(NewState)};
|
|
Error -> {stop, Error}
|
|
end;
|
|
|
|
connected(info, {timeout, TRef, keepalive},
|
|
State = #state{socket = Sock, paused = Paused, keepalive_timer = TRef}) ->
|
|
case (not Paused) andalso should_ping(Sock) of
|
|
true ->
|
|
case send(?PACKET(?PINGREQ), State) of
|
|
{ok, NewState} ->
|
|
{keep_state, ensure_keepalive_timer(NewState), [hibernate]};
|
|
Error -> {stop, Error}
|
|
end;
|
|
false ->
|
|
{keep_state, ensure_keepalive_timer(State), [hibernate]};
|
|
{error, Reason} ->
|
|
{stop, Reason}
|
|
end;
|
|
|
|
connected(info, {timeout, TRef, ack}, State = #state{ack_timer = TRef,
|
|
ack_timeout = Timeout,
|
|
pending_calls = Calls}) ->
|
|
NewState = State#state{ack_timer = undefined,
|
|
pending_calls = timeout_calls(Timeout, Calls)},
|
|
{keep_state, ensure_ack_timer(NewState)};
|
|
|
|
connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef,
|
|
inflight = Inflight}) ->
|
|
case emqx_inflight:is_empty(Inflight) of
|
|
true -> {keep_state, State#state{retry_timer = undefined}};
|
|
false -> retry_send(State)
|
|
end;
|
|
|
|
connected(EventType, EventContent, Data) ->
|
|
handle_event(EventType, EventContent, connected, Data).
|
|
|
|
inflight_full({call, _From}, {publish, #mqtt_msg{qos = QoS}}, _State) when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
|
|
{keep_state_and_data, [postpone]};
|
|
inflight_full(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) ->
|
|
delete_inflight_when_full(PubAck, State);
|
|
inflight_full(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) ->
|
|
delete_inflight_when_full(PubComp, State);
|
|
inflight_full(EventType, EventContent, Data) ->
|
|
%% inflight_full is a sub-state of connected state,
|
|
%% delegate all other events to connected state.
|
|
connected(EventType, EventContent, Data).
|
|
|
|
handle_event({call, From}, stop, _StateName, _State) ->
|
|
{stop_and_reply, normal, [{reply, From, ok}]};
|
|
handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State)
|
|
when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl ->
|
|
?LOG(debug, "RECV Data: ~p", [Data]),
|
|
process_incoming(Data, [], run_sock(State));
|
|
|
|
handle_event(info, {Error, _Sock, Reason}, _StateName, State)
|
|
when Error =:= tcp_error; Error =:= ssl_error ->
|
|
?LOG(error, "The connection error occured ~p, reason:~p", [Error, Reason]),
|
|
{stop, {shutdown, Reason}, State};
|
|
|
|
handle_event(info, {Closed, _Sock}, _StateName, State)
|
|
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
|
|
?LOG(debug, "~p", [Closed]),
|
|
{stop, {shutdown, Closed}, State};
|
|
|
|
handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) ->
|
|
?LOG(debug, "Got EXIT from owner, Reason: ~p", [Reason]),
|
|
{stop, {shutdown, Reason}, State};
|
|
|
|
handle_event(info, {inet_reply, _Sock, ok}, _, _State) ->
|
|
keep_state_and_data;
|
|
|
|
handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
|
|
?LOG(error, "Got tcp error: ~p", [Reason]),
|
|
{stop, {shutdown, Reason}, State};
|
|
|
|
handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) ->
|
|
?LOG(info, "State: ~s, Unexpected Event: (info, ~p)",
|
|
[StateName, EventContent]),
|
|
keep_state_and_data;
|
|
|
|
handle_event(EventType, EventContent, StateName, _StateData) ->
|
|
?LOG(error, "State: ~s, Unexpected Event: (~p, ~p)",
|
|
[StateName, EventType, EventContent]),
|
|
keep_state_and_data.
|
|
|
|
%% Mandatory callback functions
|
|
terminate(Reason, _StateName, State = #state{socket = Socket}) ->
|
|
case Reason of
|
|
{disconnected, ReasonCode, Properties} ->
|
|
%% backward compatible
|
|
ok = eval_msg_handler(State, disconnected, {ReasonCode, Properties});
|
|
_ ->
|
|
ok = eval_msg_handler(State, disconnected, Reason)
|
|
end,
|
|
case Socket =:= undefined of
|
|
true -> ok;
|
|
_ -> emqx_client_sock:close(Socket)
|
|
end.
|
|
|
|
code_change(_Vsn, State, Data, _Extra) ->
|
|
{ok, State, Data}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
should_ping(Sock) ->
|
|
case emqx_client_sock:getstat(Sock, [send_oct]) of
|
|
{ok, [{send_oct, Val}]} ->
|
|
OldVal = get(send_oct), put(send_oct, Val),
|
|
OldVal == undefined orelse OldVal == Val;
|
|
Error = {error, _Reason} ->
|
|
Error
|
|
end.
|
|
|
|
delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties),
|
|
State = #state{inflight = Inflight}) ->
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
{value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} ->
|
|
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
|
|
reason_code => ReasonCode,
|
|
properties => Properties}),
|
|
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
|
|
none ->
|
|
?LOG(warning, "Unexpected PUBACK: ~p", [PacketId]),
|
|
State
|
|
end;
|
|
delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
|
|
State = #state{inflight = Inflight}) ->
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
{value, {pubrel, _PacketId, _Ts}} ->
|
|
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
|
|
reason_code => ReasonCode,
|
|
properties => Properties}),
|
|
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
|
|
none ->
|
|
?LOG(warning, "Unexpected PUBCOMP Packet: ~p", [PacketId]),
|
|
State
|
|
end.
|
|
|
|
delete_inflight_when_full(Packet, State0) ->
|
|
State = #state{inflight = Inflight} = delete_inflight(Packet, State0),
|
|
case emqx_inflight:is_full(Inflight) of
|
|
true -> {keep_state, State};
|
|
false -> {next_state, connected, State}
|
|
end.
|
|
|
|
assign_id(?NO_CLIENT_ID, Props) ->
|
|
case maps:find('Assigned-Client-Identifier', Props) of
|
|
{ok, Value} ->
|
|
Value;
|
|
_ ->
|
|
error(bad_client_id)
|
|
end;
|
|
assign_id(Id, _Props) ->
|
|
Id.
|
|
|
|
publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId),
|
|
State0 = #state{auto_ack = AutoAck}) ->
|
|
State = deliver(packet_to_msg(Packet), State0),
|
|
case AutoAck of
|
|
true -> send_puback(?PUBACK_PACKET(PacketId), State);
|
|
false -> {keep_state, State}
|
|
end;
|
|
publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId),
|
|
State = #state{awaiting_rel = AwaitingRel}) ->
|
|
case send_puback(?PUBREC_PACKET(PacketId), State) of
|
|
{keep_state, NewState} ->
|
|
AwaitingRel1 = maps:put(PacketId, Packet, AwaitingRel),
|
|
{keep_state, NewState#state{awaiting_rel = AwaitingRel1}};
|
|
Stop -> Stop
|
|
end.
|
|
|
|
ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) ->
|
|
ensure_keepalive_timer(timer:seconds(Secs), State#state{keepalive = Secs});
|
|
ensure_keepalive_timer(State = #state{keepalive = 0}) ->
|
|
State;
|
|
ensure_keepalive_timer(State = #state{keepalive = I}) ->
|
|
ensure_keepalive_timer(timer:seconds(I), State).
|
|
ensure_keepalive_timer(I, State) when is_integer(I) ->
|
|
State#state{keepalive_timer = erlang:start_timer(I, self(), keepalive)}.
|
|
|
|
new_call(Id, From) ->
|
|
new_call(Id, From, undefined).
|
|
new_call(Id, From, Req) ->
|
|
#call{id = Id, from = From, req = Req, ts = os:timestamp()}.
|
|
|
|
add_call(Call, Data = #state{pending_calls = Calls}) ->
|
|
Data#state{pending_calls = [Call | Calls]}.
|
|
|
|
take_call(Id, Data = #state{pending_calls = Calls}) ->
|
|
case lists:keytake(Id, #call.id, Calls) of
|
|
{value, Call, Left} ->
|
|
{value, Call, Data#state{pending_calls = Left}};
|
|
false -> false
|
|
end.
|
|
|
|
timeout_calls(Timeout, Calls) ->
|
|
timeout_calls(os:timestamp(), Timeout, Calls).
|
|
timeout_calls(Now, Timeout, Calls) ->
|
|
lists:foldl(fun(C = #call{from = From, ts = Ts}, Acc) ->
|
|
case (timer:now_diff(Now, Ts) div 1000) >= Timeout of
|
|
true -> From ! {error, ack_timeout},
|
|
Acc;
|
|
false -> [C | Acc]
|
|
end
|
|
end, [], Calls).
|
|
|
|
ensure_ack_timer(State = #state{ack_timer = undefined,
|
|
ack_timeout = Timeout,
|
|
pending_calls = Calls}) when length(Calls) > 0 ->
|
|
State#state{ack_timer = erlang:start_timer(Timeout, self(), ack)};
|
|
ensure_ack_timer(State) -> State.
|
|
|
|
ensure_retry_timer(State = #state{retry_interval = Interval}) ->
|
|
do_ensure_retry_timer(Interval, State).
|
|
|
|
do_ensure_retry_timer(Interval, State = #state{retry_timer = undefined})
|
|
when Interval > 0 ->
|
|
State#state{retry_timer = erlang:start_timer(Interval, self(), retry)};
|
|
do_ensure_retry_timer(_Interval, State) ->
|
|
State.
|
|
|
|
retry_send(State = #state{inflight = Inflight}) ->
|
|
SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end,
|
|
Msgs = lists:sort(SortFun, emqx_inflight:values(Inflight)),
|
|
retry_send(Msgs, os:timestamp(), State ).
|
|
|
|
retry_send([], _Now, State) ->
|
|
{keep_state, ensure_retry_timer(State)};
|
|
retry_send([{Type, Msg, Ts} | Msgs], Now, State = #state{retry_interval = Interval}) ->
|
|
Diff = timer:now_diff(Now, Ts) div 1000, %% micro -> ms
|
|
case (Diff >= Interval) of
|
|
true -> case retry_send(Type, Msg, Now, State) of
|
|
{ok, NewState} -> retry_send(Msgs, Now, NewState);
|
|
{error, Error} -> {stop, Error}
|
|
end;
|
|
false -> {keep_state, do_ensure_retry_timer(Interval - Diff, State)}
|
|
end.
|
|
|
|
retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId},
|
|
Now, State = #state{inflight = Inflight}) ->
|
|
Msg1 = Msg#mqtt_msg{dup = (QoS =:= ?QOS_1)},
|
|
case send(Msg1, State) of
|
|
{ok, NewState} ->
|
|
Inflight1 = emqx_inflight:update(PacketId, {publish, Msg1, Now}, Inflight),
|
|
{ok, NewState#state{inflight = Inflight1}};
|
|
Error = {error, _Reason} ->
|
|
Error
|
|
end;
|
|
retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) ->
|
|
case send(?PUBREL_PACKET(PacketId), State) of
|
|
{ok, NewState} ->
|
|
Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight),
|
|
{ok, NewState#state{inflight = Inflight1}};
|
|
Error = {error, _Reason} ->
|
|
Error
|
|
end.
|
|
|
|
deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
|
|
topic = Topic, props = Props, payload = Payload},
|
|
State) ->
|
|
Msg = #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId,
|
|
topic => Topic, properties => Props, payload => Payload,
|
|
client_pid => self()},
|
|
ok = eval_msg_handler(State, publish, Msg),
|
|
State.
|
|
|
|
eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR,
|
|
owner = Owner},
|
|
disconnected, {ReasonCode, Properties}) ->
|
|
%% Special handling for disconnected message when there is no handler callback
|
|
Owner ! {disconnected, ReasonCode, Properties},
|
|
ok;
|
|
eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR},
|
|
disconnected, _OtherReason) ->
|
|
%% do nothing to be backward compatible
|
|
ok;
|
|
eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR,
|
|
owner = Owner}, Kind, Msg) ->
|
|
Owner ! {Kind, Msg},
|
|
ok;
|
|
eval_msg_handler(#state{msg_handler = Handler}, Kind, Msg) ->
|
|
F = maps:get(Kind, Handler),
|
|
_ = F(Msg),
|
|
ok.
|
|
|
|
packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
|
dup = Dup,
|
|
qos = QoS,
|
|
retain = R},
|
|
variable = #mqtt_packet_publish{topic_name = Topic,
|
|
packet_id = PacketId,
|
|
properties = Props},
|
|
payload = Payload}) ->
|
|
#mqtt_msg{qos = QoS, retain = R, dup = Dup, packet_id = PacketId,
|
|
topic = Topic, props = Props, payload = Payload}.
|
|
|
|
msg_to_packet(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
|
|
topic = Topic, props = Props, payload = Payload}) ->
|
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
|
qos = QoS,
|
|
retain = Retain,
|
|
dup = Dup},
|
|
variable = #mqtt_packet_publish{topic_name = Topic,
|
|
packet_id = PacketId,
|
|
properties = Props},
|
|
payload = Payload}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Socket Connect/Send
|
|
|
|
sock_connect(Hosts, SockOpts, Timeout) ->
|
|
sock_connect(Hosts, SockOpts, Timeout, {error, no_hosts}).
|
|
|
|
sock_connect([], _SockOpts, _Timeout, LastErr) ->
|
|
LastErr;
|
|
sock_connect([{Host, Port} | Hosts], SockOpts, Timeout, _LastErr) ->
|
|
case emqx_client_sock:connect(Host, Port, SockOpts, Timeout) of
|
|
{ok, Socket} -> {ok, Socket};
|
|
Err = {error, _Reason} ->
|
|
sock_connect(Hosts, SockOpts, Timeout, Err)
|
|
end.
|
|
|
|
hosts(#state{hosts = [], host = Host, port = Port}) ->
|
|
[{Host, Port}];
|
|
hosts(#state{hosts = Hosts}) -> Hosts.
|
|
|
|
send_puback(Packet, State) ->
|
|
case send(Packet, State) of
|
|
{ok, NewState} -> {keep_state, NewState};
|
|
{error, Reason} -> {stop, {shutdown, Reason}}
|
|
end.
|
|
|
|
send(Msg, State) when is_record(Msg, mqtt_msg) ->
|
|
send(msg_to_packet(Msg), State);
|
|
|
|
send(Packet, State = #state{socket = Sock, proto_ver = Ver})
|
|
when is_record(Packet, mqtt_packet) ->
|
|
Data = emqx_frame:serialize(Packet, #{version => Ver}),
|
|
?LOG(debug, "SEND Data: ~1000p", [Packet]),
|
|
case emqx_client_sock:send(Sock, Data) of
|
|
ok -> {ok, bump_last_packet_id(State)};
|
|
Error -> Error
|
|
end.
|
|
|
|
run_sock(State = #state{socket = Sock}) ->
|
|
emqx_client_sock:setopts(Sock, [{active, once}]), State.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Process incomming
|
|
|
|
process_incoming(<<>>, Packets, State) ->
|
|
{keep_state, State, next_events(Packets)};
|
|
|
|
process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) ->
|
|
try emqx_frame:parse(Bytes, ParseState) of
|
|
{ok, Packet, Rest, NParseState} ->
|
|
process_incoming(Rest, [Packet|Packets], State#state{parse_state = NParseState});
|
|
{ok, NParseState} ->
|
|
{keep_state, State#state{parse_state = NParseState}, next_events(Packets)};
|
|
{error, Reason} ->
|
|
{stop, Reason}
|
|
catch
|
|
error:Error ->
|
|
{stop, Error}
|
|
end.
|
|
|
|
next_events([]) ->
|
|
[];
|
|
next_events([Packet]) ->
|
|
{next_event, cast, Packet};
|
|
next_events(Packets) ->
|
|
[{next_event, cast, Packet} || Packet <- lists:reverse(Packets)].
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% packet_id generation
|
|
|
|
bump_last_packet_id(State = #state{last_packet_id = Id}) ->
|
|
State#state{last_packet_id = next_packet_id(Id)}.
|
|
|
|
-spec next_packet_id(packet_id()) -> packet_id().
|
|
next_packet_id(?MAX_PACKET_ID) -> 1;
|
|
next_packet_id(Id) -> Id + 1.
|
|
|