refactor(gw): use typical conn&channel to implement mqtt-sn gateway
This commit is contained in:
parent
45912d8a81
commit
0f9b5ff3a1
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,802 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc MQTT-SN Connection process
|
||||
-module(emqx_sn_conn).
|
||||
|
||||
-include_lib("emqx/include/types.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-logger_header("[SN-Conn]").
|
||||
|
||||
%% API
|
||||
-export([ start_link/3
|
||||
, stop/1
|
||||
]).
|
||||
|
||||
-export([ info/1
|
||||
, stats/1
|
||||
]).
|
||||
|
||||
-export([ call/2
|
||||
, call/3
|
||||
, cast/2
|
||||
]).
|
||||
|
||||
%% Callback
|
||||
-export([init/4]).
|
||||
|
||||
%% Sys callbacks
|
||||
-export([ system_continue/3
|
||||
, system_terminate/4
|
||||
, system_code_change/4
|
||||
, system_get_state/1
|
||||
]).
|
||||
|
||||
%% Internal callback
|
||||
-export([wakeup_from_hib/2, recvloop/2]).
|
||||
|
||||
-import(emqx_misc, [start_timer/2]).
|
||||
|
||||
-record(state, {
|
||||
%% TCP/SSL/UDP/DTLS Wrapped Socket
|
||||
socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
|
||||
%% Peername of the connection
|
||||
peername :: emqx_types:peername(),
|
||||
%% Sockname of the connection
|
||||
sockname :: emqx_types:peername(),
|
||||
%% Sock State
|
||||
sockstate :: emqx_types:sockstate(),
|
||||
%% The {active, N} option
|
||||
active_n :: pos_integer(),
|
||||
%% Limiter
|
||||
limiter :: maybe(emqx_limiter:limiter()),
|
||||
%% Limit Timer
|
||||
limit_timer :: maybe(reference()),
|
||||
%% Parse State
|
||||
parse_state :: emqx_sn_frame:parse_state(),
|
||||
%% Serialize options
|
||||
serialize :: emqx_sn_frame:serialize_opts(),
|
||||
%% Channel State
|
||||
channel :: emqx_sn_channel:channel(),
|
||||
%% GC State
|
||||
gc_state :: maybe(emqx_gc:gc_state()),
|
||||
%% Stats Timer
|
||||
stats_timer :: disabled | maybe(reference()),
|
||||
%% Idle Timeout
|
||||
idle_timeout :: integer(),
|
||||
%% Idle Timer
|
||||
idle_timer :: maybe(reference())
|
||||
}).
|
||||
|
||||
-type(state() :: #state{}).
|
||||
|
||||
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
|
||||
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
|
||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
|
||||
|
||||
-define(ENABLED(X), (X =/= undefined)).
|
||||
|
||||
-define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}).
|
||||
-define(DEFAULT_IDLE_TIMEOUT, 30000).
|
||||
-define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304,
|
||||
message_queue_len => 32000}).
|
||||
|
||||
-dialyzer({nowarn_function,
|
||||
[ system_terminate/4
|
||||
, handle_call/3
|
||||
, handle_msg/2
|
||||
, shutdown/3
|
||||
, stop/3
|
||||
]}).
|
||||
|
||||
%% udp
|
||||
start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) ->
|
||||
Args = [self(), Socket, Peername, Options],
|
||||
{ok, proc_lib:spawn_link(?MODULE, init, Args)};
|
||||
|
||||
%% tcp/ssl/dtls
|
||||
start_link(esockd_transport, Sock, Options) ->
|
||||
Socket = {esockd_transport, Sock},
|
||||
case esockd_transport:peername(Sock) of
|
||||
{ok, Peername} ->
|
||||
Args = [self(), Socket, Peername, Options],
|
||||
{ok, proc_lib:spawn_link(?MODULE, init, Args)};
|
||||
R = {error, _} -> R
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Get infos of the connection/channel.
|
||||
-spec(info(pid()|state()) -> emqx_types:infos()).
|
||||
info(CPid) when is_pid(CPid) ->
|
||||
call(CPid, info);
|
||||
info(State = #state{channel = Channel}) ->
|
||||
ChanInfo = emqx_sn_channel:info(Channel),
|
||||
SockInfo = maps:from_list(
|
||||
info(?INFO_KEYS, State)),
|
||||
ChanInfo#{sockinfo => SockInfo}.
|
||||
|
||||
info(Keys, State) when is_list(Keys) ->
|
||||
[{Key, info(Key, State)} || Key <- Keys];
|
||||
info(socktype, #state{socket = Socket}) ->
|
||||
esockd_type(Socket);
|
||||
info(peername, #state{peername = Peername}) ->
|
||||
Peername;
|
||||
info(sockname, #state{sockname = Sockname}) ->
|
||||
Sockname;
|
||||
info(sockstate, #state{sockstate = SockSt}) ->
|
||||
SockSt;
|
||||
info(active_n, #state{active_n = ActiveN}) ->
|
||||
ActiveN.
|
||||
|
||||
-spec(stats(pid()|state()) -> emqx_types:stats()).
|
||||
stats(CPid) when is_pid(CPid) ->
|
||||
call(CPid, stats);
|
||||
stats(#state{socket = Socket,
|
||||
channel = Channel}) ->
|
||||
SockStats = case esockd_getstat(Socket, ?SOCK_STATS) of
|
||||
{ok, Ss} -> Ss;
|
||||
{error, _} -> []
|
||||
end,
|
||||
ConnStats = emqx_pd:get_counters(?CONN_STATS),
|
||||
ChanStats = emqx_sn_channel:stats(Channel),
|
||||
ProcStats = emqx_misc:proc_stats(),
|
||||
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
|
||||
|
||||
call(Pid, Req) ->
|
||||
call(Pid, Req, infinity).
|
||||
|
||||
call(Pid, Req, Timeout) ->
|
||||
gen_server:call(Pid, Req, Timeout).
|
||||
|
||||
cast(Pid, Req) ->
|
||||
gen_server:cast(Pid, Req).
|
||||
|
||||
stop(Pid) ->
|
||||
gen_server:stop(Pid).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Wrapped funcs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
esockd_wait(Socket = {udp, _SockPid, _Sock}) ->
|
||||
{ok, Socket};
|
||||
esockd_wait({esockd_transport, Sock}) ->
|
||||
case esockd_transport:wait(Sock) of
|
||||
{ok, NSock} -> {ok, {esockd_transport, NSock}};
|
||||
R = {error, _} -> R
|
||||
end.
|
||||
|
||||
esockd_close({udp, _SockPid, _Sock}) ->
|
||||
%% nothing to do for udp socket
|
||||
%%gen_udp:close(Sock);
|
||||
ok;
|
||||
esockd_close({esockd_transport, Sock}) ->
|
||||
esockd_transport:fast_close(Sock).
|
||||
|
||||
esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) ->
|
||||
nossl;
|
||||
esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) ->
|
||||
esockd_transport:ensure_ok_or_exit(Fun, [Sock]);
|
||||
esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) ->
|
||||
esockd_transport:ensure_ok_or_exit(Fun, [Socket]).
|
||||
|
||||
esockd_type({udp, _, _}) ->
|
||||
udp;
|
||||
esockd_type({esockd_transport, Socket}) ->
|
||||
esockd_transport:type(Socket).
|
||||
|
||||
esockd_setopts({udp, _, _}, _) ->
|
||||
ok;
|
||||
esockd_setopts({esockd_transport, Socket}, Opts) ->
|
||||
%% FIXME: DTLS works??
|
||||
esockd_transport:setopts(Socket, Opts).
|
||||
|
||||
esockd_getstat({udp, _SockPid, Sock}, Stats) ->
|
||||
inet:getstat(Sock, Stats);
|
||||
esockd_getstat({esockd_transport, Sock}, Stats) ->
|
||||
esockd_transport:getstat(Sock, Stats).
|
||||
|
||||
esockd_send(Data, #state{socket = {udp, _SockPid, Sock},
|
||||
peername = {Ip, Port}}) ->
|
||||
gen_udp:send(Sock, Ip, Port, Data);
|
||||
esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
|
||||
esockd_transport:async_send(Sock, Data).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init(Parent, WrappedSock, Peername, Options) ->
|
||||
case esockd_wait(WrappedSock) of
|
||||
{ok, NWrappedSock} ->
|
||||
run_loop(Parent, init_state(NWrappedSock, Peername, Options));
|
||||
{error, Reason} ->
|
||||
ok = esockd_close(WrappedSock),
|
||||
exit_on_sock_error(Reason)
|
||||
end.
|
||||
|
||||
init_state(WrappedSock, Peername, Options) ->
|
||||
{ok, Sockname} = esockd_ensure_ok_or_exit(sockname, WrappedSock),
|
||||
Peercert = esockd_ensure_ok_or_exit(peercert, WrappedSock),
|
||||
ConnInfo = #{socktype => esockd_type(WrappedSock),
|
||||
peername => Peername,
|
||||
sockname => Sockname,
|
||||
peercert => Peercert,
|
||||
conn_mod => ?MODULE
|
||||
},
|
||||
ActiveN = emqx_gateway_utils:active_n(Options),
|
||||
%% FIXME:
|
||||
%%Limiter = emqx_limiter:init(Options),
|
||||
Limiter = undefined,
|
||||
FrameOpts = emqx_gateway_utils:frame_options(Options),
|
||||
ParseState = emqx_sn_frame:initial_parse_state(FrameOpts),
|
||||
Serialize = emqx_sn_frame:serialize_opts(),
|
||||
Channel = emqx_sn_channel:init(ConnInfo, Options),
|
||||
GcState = emqx_gateway_utils:init_gc_state(Options),
|
||||
StatsTimer = emqx_gateway_utils:stats_timer(Options),
|
||||
IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
|
||||
IdleTimer = start_timer(IdleTimeout, idle_timeout),
|
||||
#state{socket = WrappedSock,
|
||||
peername = Peername,
|
||||
sockname = Sockname,
|
||||
sockstate = idle,
|
||||
active_n = ActiveN,
|
||||
limiter = Limiter,
|
||||
parse_state = ParseState,
|
||||
serialize = Serialize,
|
||||
channel = Channel,
|
||||
gc_state = GcState,
|
||||
stats_timer = StatsTimer,
|
||||
idle_timeout = IdleTimeout,
|
||||
idle_timer = IdleTimer
|
||||
}.
|
||||
|
||||
run_loop(Parent, State = #state{socket = Socket,
|
||||
peername = Peername}) ->
|
||||
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
||||
_ = emqx_misc:tune_heap_size(?DEFAULT_OOM_POLICY),
|
||||
case activate_socket(State) of
|
||||
{ok, NState} ->
|
||||
hibernate(Parent, NState);
|
||||
{error, Reason} ->
|
||||
ok = esockd_close(Socket),
|
||||
exit_on_sock_error(Reason)
|
||||
end.
|
||||
|
||||
-spec exit_on_sock_error(atom()) -> no_return().
|
||||
exit_on_sock_error(Reason) when Reason =:= einval;
|
||||
Reason =:= enotconn;
|
||||
Reason =:= closed ->
|
||||
erlang:exit(normal);
|
||||
exit_on_sock_error(timeout) ->
|
||||
erlang:exit({shutdown, ssl_upgrade_timeout});
|
||||
exit_on_sock_error(Reason) ->
|
||||
erlang:exit({shutdown, Reason}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Recv Loop
|
||||
|
||||
recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
||||
receive
|
||||
Msg ->
|
||||
handle_recv(Msg, Parent, State)
|
||||
after
|
||||
IdleTimeout + 100 ->
|
||||
hibernate(Parent, cancel_stats_timer(State))
|
||||
end.
|
||||
|
||||
handle_recv({system, From, Request}, Parent, State) ->
|
||||
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
|
||||
handle_recv({'EXIT', Parent, Reason}, Parent, State) ->
|
||||
%% FIXME: it's not trapping exit, should never receive an EXIT
|
||||
terminate(Reason, State);
|
||||
handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
||||
case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of
|
||||
{ok, NewState} ->
|
||||
?MODULE:recvloop(Parent, NewState);
|
||||
{stop, Reason, NewSate} ->
|
||||
terminate(Reason, NewSate)
|
||||
end.
|
||||
|
||||
hibernate(Parent, State) ->
|
||||
proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).
|
||||
|
||||
%% Maybe do something here later.
|
||||
wakeup_from_hib(Parent, State) ->
|
||||
?MODULE:recvloop(Parent, State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Ensure/cancel stats timer
|
||||
|
||||
ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
|
||||
State#state{stats_timer = start_timer(Timeout, emit_stats)};
|
||||
ensure_stats_timer(_Timeout, State) -> State.
|
||||
|
||||
cancel_stats_timer(State = #state{stats_timer = TRef})
|
||||
when is_reference(TRef) ->
|
||||
ok = emqx_misc:cancel_timer(TRef),
|
||||
State#state{stats_timer = undefined};
|
||||
cancel_stats_timer(State) -> State.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Process next Msg
|
||||
|
||||
process_msg([], State) ->
|
||||
{ok, State};
|
||||
process_msg([Msg|More], State) ->
|
||||
try
|
||||
case handle_msg(Msg, State) of
|
||||
ok ->
|
||||
process_msg(More, State);
|
||||
{ok, NState} ->
|
||||
process_msg(More, NState);
|
||||
{ok, Msgs, NState} ->
|
||||
process_msg(append_msg(More, Msgs), NState);
|
||||
{stop, Reason, NState} ->
|
||||
{stop, Reason, NState}
|
||||
end
|
||||
catch
|
||||
exit : normal ->
|
||||
{stop, normal, State};
|
||||
exit : shutdown ->
|
||||
{stop, shutdown, State};
|
||||
exit : {shutdown, _} = Shutdown ->
|
||||
{stop, Shutdown, State};
|
||||
Exception : Context : Stack ->
|
||||
{stop, #{exception => Exception,
|
||||
context => Context,
|
||||
stacktrace => Stack}, State}
|
||||
end.
|
||||
|
||||
append_msg([], Msgs) when is_list(Msgs) ->
|
||||
Msgs;
|
||||
append_msg([], Msg) -> [Msg];
|
||||
append_msg(Q, Msgs) when is_list(Msgs) ->
|
||||
lists:append(Q, Msgs);
|
||||
append_msg(Q, Msg) ->
|
||||
lists:append(Q, [Msg]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle a Msg
|
||||
|
||||
handle_msg({'$gen_call', From, Req}, State) ->
|
||||
case handle_call(From, Req, State) of
|
||||
{reply, Reply, NState} ->
|
||||
gen_server:reply(From, Reply),
|
||||
{ok, NState};
|
||||
{reply, Reply, Msgs, NState} ->
|
||||
gen_server:reply(From, Reply),
|
||||
{ok, next_msgs(Msgs), NState};
|
||||
{stop, Reason, Reply, NState} ->
|
||||
gen_server:reply(From, Reply),
|
||||
stop(Reason, NState)
|
||||
end;
|
||||
|
||||
handle_msg({'$gen_cast', Req}, State) ->
|
||||
with_channel(handle_cast, [Req], State);
|
||||
|
||||
handle_msg({datagram, _SockPid, Data}, State) ->
|
||||
parse_incoming(Data, State);
|
||||
|
||||
handle_msg({Inet, _Sock, Data}, State)
|
||||
when Inet == tcp;
|
||||
Inet == ssl ->
|
||||
parse_incoming(Data, State);
|
||||
|
||||
handle_msg({incoming, Packet},
|
||||
State = #state{idle_timer = IdleTimer}) ->
|
||||
IdleTimer /= undefined andalso
|
||||
emqx_misc:cancel_timer(IdleTimer),
|
||||
NState = State#state{idle_timer = undefined},
|
||||
handle_incoming(Packet, NState);
|
||||
|
||||
handle_msg({outgoing, Data}, State) ->
|
||||
handle_outgoing(Data, State);
|
||||
|
||||
handle_msg({Error, _Sock, Reason}, State)
|
||||
when Error == tcp_error; Error == ssl_error ->
|
||||
handle_info({sock_error, Reason}, State);
|
||||
|
||||
handle_msg({Closed, _Sock}, State)
|
||||
when Closed == tcp_closed; Closed == ssl_closed ->
|
||||
handle_info({sock_closed, Closed}, close_socket(State));
|
||||
|
||||
%% TODO: udp_passive???
|
||||
handle_msg({Passive, _Sock}, State)
|
||||
when Passive == tcp_passive; Passive == ssl_passive ->
|
||||
%% In Stats
|
||||
Bytes = emqx_pd:reset_counter(incoming_bytes),
|
||||
Pubs = emqx_pd:reset_counter(incoming_pkt),
|
||||
InStats = #{cnt => Pubs, oct => Bytes},
|
||||
%% Ensure Rate Limit
|
||||
NState = ensure_rate_limit(InStats, State),
|
||||
%% Run GC and Check OOM
|
||||
NState1 = check_oom(run_gc(InStats, NState)),
|
||||
handle_info(activate_socket, NState1);
|
||||
|
||||
handle_msg(Deliver = {deliver, _Topic, _Msg},
|
||||
State = #state{active_n = ActiveN}) ->
|
||||
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
|
||||
with_channel(handle_deliver, [Delivers], State);
|
||||
|
||||
%% Something sent
|
||||
%% TODO: Who will deliver this message?
|
||||
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
|
||||
case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
|
||||
true ->
|
||||
Pubs = emqx_pd:reset_counter(outgoing_pkt),
|
||||
Bytes = emqx_pd:reset_counter(outgoing_bytes),
|
||||
OutStats = #{cnt => Pubs, oct => Bytes},
|
||||
{ok, check_oom(run_gc(OutStats, State))};
|
||||
false -> ok
|
||||
end;
|
||||
|
||||
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||
handle_info({sock_error, Reason}, State);
|
||||
|
||||
handle_msg({close, Reason}, State) ->
|
||||
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
|
||||
handle_info({sock_closed, Reason}, close_socket(State));
|
||||
|
||||
handle_msg({event, connected}, State = #state{channel = Channel}) ->
|
||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
||||
ClientId = emqx_sn_channel:info(clientid, Channel),
|
||||
emqx_gateway_ctx:insert_channel_info(
|
||||
Ctx,
|
||||
ClientId,
|
||||
info(State),
|
||||
stats(State)
|
||||
);
|
||||
|
||||
handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
|
||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
||||
ClientId = emqx_sn_channel:info(clientid, Channel),
|
||||
emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)),
|
||||
emqx_gateway_ctx:connection_closed(Ctx, ClientId),
|
||||
{ok, State};
|
||||
|
||||
handle_msg({event, _Other}, State = #state{channel = Channel}) ->
|
||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
||||
ClientId = emqx_sn_channel:info(clientid, Channel),
|
||||
emqx_gateway_ctx:set_chan_info(Ctx, ClientId, info(State)),
|
||||
emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)),
|
||||
{ok, State};
|
||||
|
||||
handle_msg({timeout, TRef, TMsg}, State) ->
|
||||
handle_timeout(TRef, TMsg, State);
|
||||
|
||||
handle_msg(Shutdown = {shutdown, _Reason}, State) ->
|
||||
stop(Shutdown, State);
|
||||
|
||||
handle_msg(Msg, State) ->
|
||||
handle_info(Msg, State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Terminate
|
||||
|
||||
-spec terminate(atom(), state()) -> no_return().
|
||||
terminate(Reason, State = #state{channel = Channel}) ->
|
||||
?LOG(debug, "Terminated due to ~p", [Reason]),
|
||||
_ = emqx_sn_channel:terminate(Reason, Channel),
|
||||
_ = close_socket(State),
|
||||
exit(Reason).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Sys callbacks
|
||||
|
||||
system_continue(Parent, _Debug, State) ->
|
||||
recvloop(Parent, State).
|
||||
|
||||
system_terminate(Reason, _Parent, _Debug, State) ->
|
||||
terminate(Reason, State).
|
||||
|
||||
system_code_change(State, _Mod, _OldVsn, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
system_get_state(State) -> {ok, State}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle call
|
||||
|
||||
handle_call(_From, info, State) ->
|
||||
{reply, info(State), State};
|
||||
|
||||
handle_call(_From, stats, State) ->
|
||||
{reply, stats(State), State};
|
||||
|
||||
handle_call(_From, Req, State = #state{channel = Channel}) ->
|
||||
case emqx_sn_channel:handle_call(Req, Channel) of
|
||||
{reply, Reply, NChannel} ->
|
||||
{reply, Reply, State#state{channel = NChannel}};
|
||||
{reply, Reply, Replies, NChannel} ->
|
||||
{reply, Reply, Replies, State#state{channel = NChannel}};
|
||||
{shutdown, Reason, Reply, NChannel} ->
|
||||
shutdown(Reason, Reply, State#state{channel = NChannel})
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle timeout
|
||||
|
||||
handle_timeout(_TRef, idle_timeout, State) ->
|
||||
shutdown(idle_timeout, State);
|
||||
|
||||
handle_timeout(_TRef, limit_timeout, State) ->
|
||||
NState = State#state{sockstate = idle,
|
||||
limit_timer = undefined
|
||||
},
|
||||
handle_info(activate_socket, NState);
|
||||
handle_timeout(TRef, keepalive, State = #state{socket = Socket,
|
||||
channel = Channel})->
|
||||
case emqx_sn_channel:info(conn_state, Channel) of
|
||||
disconnected -> {ok, State};
|
||||
_ ->
|
||||
case esockd_getstat(Socket, [recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} ->
|
||||
handle_timeout(TRef, {keepalive, RecvOct}, State);
|
||||
{error, Reason} ->
|
||||
handle_info({sock_error, Reason}, State)
|
||||
end
|
||||
end;
|
||||
handle_timeout(_TRef, emit_stats, State =
|
||||
#state{channel = Channel}) ->
|
||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
||||
ClientId = emqx_sn_channel:info(clientid, Channel),
|
||||
emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)),
|
||||
{ok, State#state{stats_timer = undefined}};
|
||||
|
||||
handle_timeout(TRef, Msg, State) ->
|
||||
with_channel(handle_timeout, [TRef, Msg], State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Parse incoming data
|
||||
|
||||
parse_incoming(Data, State = #state{channel = Channel}) ->
|
||||
?LOG(debug, "RECV ~0p", [Data]),
|
||||
Oct = iolist_size(Data),
|
||||
inc_counter(incoming_bytes, Oct),
|
||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct),
|
||||
{Packets, NState} = parse_incoming(Data, [], State),
|
||||
{ok, next_incoming_msgs(Packets), NState}.
|
||||
|
||||
parse_incoming(<<>>, Packets, State) ->
|
||||
{Packets, State};
|
||||
|
||||
parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
||||
try emqx_sn_frame:parse(Data, ParseState) of
|
||||
{more, NParseState} ->
|
||||
{Packets, State#state{parse_state = NParseState}};
|
||||
{ok, Packet, Rest, NParseState} ->
|
||||
NState = State#state{parse_state = NParseState},
|
||||
parse_incoming(Rest, [Packet|Packets], NState)
|
||||
catch
|
||||
error:Reason:Stk ->
|
||||
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p",
|
||||
[Reason, Stk, Data]),
|
||||
{[{frame_error, Reason}|Packets], State}
|
||||
end.
|
||||
|
||||
next_incoming_msgs([Packet]) ->
|
||||
{incoming, Packet};
|
||||
next_incoming_msgs(Packets) ->
|
||||
[{incoming, Packet} || Packet <- lists:reverse(Packets)].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle incoming packet
|
||||
|
||||
handle_incoming(Packet, State) ->
|
||||
ok = inc_incoming_stats(Packet),
|
||||
?LOG(debug, "RECV ~s", [emqx_sn_frame:format(Packet)]),
|
||||
with_channel(handle_in, [Packet], State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% With Channel
|
||||
|
||||
with_channel(Fun, Args, State = #state{channel = Channel}) ->
|
||||
case erlang:apply(emqx_sn_channel, Fun, Args ++ [Channel]) of
|
||||
ok -> {ok, State};
|
||||
{ok, NChannel} ->
|
||||
{ok, State#state{channel = NChannel}};
|
||||
{ok, Replies, NChannel} ->
|
||||
{ok, next_msgs(Replies), State#state{channel = NChannel}};
|
||||
{shutdown, Reason, NChannel} ->
|
||||
shutdown(Reason, State#state{channel = NChannel});
|
||||
{shutdown, Reason, Packet, NChannel} ->
|
||||
NState = State#state{channel = NChannel},
|
||||
ok = handle_outgoing(Packet, NState),
|
||||
shutdown(Reason, NState)
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle outgoing packets
|
||||
|
||||
handle_outgoing(Packets, State) when is_list(Packets) ->
|
||||
send(lists:map(serialize_and_inc_stats_fun(State), Packets), State);
|
||||
|
||||
handle_outgoing(Packet, State) ->
|
||||
send((serialize_and_inc_stats_fun(State))(Packet), State).
|
||||
|
||||
serialize_and_inc_stats_fun(#state{serialize = Serialize, channel = Channel}) ->
|
||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
||||
fun(Packet) ->
|
||||
case emqx_sn_frame:serialize_pkt(Packet, Serialize) of
|
||||
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
|
||||
[emqx_sn_frame:format(Packet)]),
|
||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'),
|
||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
|
||||
<<>>;
|
||||
Data -> ?LOG(debug, "SEND ~s", [emqx_sn_frame:format(Packet)]),
|
||||
ok = inc_outgoing_stats(Packet),
|
||||
Data
|
||||
end
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Send data
|
||||
|
||||
-spec(send(iodata(), state()) -> ok).
|
||||
send(IoData, State = #state{socket = Socket, channel = Channel}) ->
|
||||
Ctx = emqx_sn_channel:info(ctx, Channel),
|
||||
Oct = iolist_size(IoData),
|
||||
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.sent', Oct),
|
||||
inc_counter(outgoing_bytes, Oct),
|
||||
case esockd_send(IoData, State) of
|
||||
ok -> ok;
|
||||
Error = {error, _Reason} ->
|
||||
%% Send an inet_reply to postpone handling the error
|
||||
self() ! {inet_reply, Socket, Error},
|
||||
ok
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle Info
|
||||
|
||||
handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
|
||||
case activate_socket(State) of
|
||||
{ok, NState = #state{sockstate = NewSst}} ->
|
||||
if OldSst =/= NewSst ->
|
||||
{ok, {event, NewSst}, NState};
|
||||
true -> {ok, NState}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
handle_info({sock_error, Reason}, State)
|
||||
end;
|
||||
|
||||
handle_info({sock_error, Reason}, State) ->
|
||||
?LOG(debug, "Socket error: ~p", [Reason]),
|
||||
handle_info({sock_closed, Reason}, close_socket(State));
|
||||
|
||||
handle_info(Info, State) ->
|
||||
with_channel(handle_info, [Info], State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Ensure rate limit
|
||||
|
||||
ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
||||
case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of
|
||||
false -> State;
|
||||
{ok, Limiter1} ->
|
||||
State#state{limiter = Limiter1};
|
||||
{pause, Time, Limiter1} ->
|
||||
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
|
||||
TRef = start_timer(Time, limit_timeout),
|
||||
State#state{sockstate = blocked,
|
||||
limiter = Limiter1,
|
||||
limit_timer = TRef
|
||||
}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Run GC and Check OOM
|
||||
|
||||
run_gc(Stats, State = #state{gc_state = GcSt}) ->
|
||||
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
|
||||
false -> State;
|
||||
{_IsGC, GcSt1} ->
|
||||
State#state{gc_state = GcSt1}
|
||||
end.
|
||||
|
||||
check_oom(State) ->
|
||||
OomPolicy = ?DEFAULT_OOM_POLICY,
|
||||
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
|
||||
Shutdown = {shutdown, _Reason} ->
|
||||
erlang:send(self(), Shutdown);
|
||||
_Other -> ok
|
||||
end,
|
||||
State.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Activate Socket
|
||||
|
||||
activate_socket(State = #state{sockstate = closed}) ->
|
||||
{ok, State};
|
||||
activate_socket(State = #state{sockstate = blocked}) ->
|
||||
{ok, State};
|
||||
activate_socket(State = #state{socket = Socket,
|
||||
active_n = N}) ->
|
||||
%% FIXME: Works on dtls/udp ???
|
||||
%% How to hanlde buffer?
|
||||
case esockd_setopts(Socket, [{active, N}]) of
|
||||
ok -> {ok, State#state{sockstate = running}};
|
||||
Error -> Error
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Close Socket
|
||||
|
||||
close_socket(State = #state{sockstate = closed}) -> State;
|
||||
close_socket(State = #state{socket = Socket}) ->
|
||||
ok = esockd_close(Socket),
|
||||
State#state{sockstate = closed}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Inc incoming/outgoing stats
|
||||
|
||||
%% XXX: How to stats?
|
||||
inc_incoming_stats(_Packet) ->
|
||||
inc_counter(recv_pkt, 1),
|
||||
ok.
|
||||
%case Type =:= ?CMD_SEND of
|
||||
% true ->
|
||||
% inc_counter(recv_msg, 1),
|
||||
% inc_counter(incoming_pubs, 1);
|
||||
% false ->
|
||||
% ok
|
||||
%end,
|
||||
%emqx_metrics:inc_recv(Packet).
|
||||
|
||||
inc_outgoing_stats(_Packet) ->
|
||||
inc_counter(send_pkt, 1),
|
||||
ok.
|
||||
%case Type =:= ?CMD_MESSAGE of
|
||||
% true ->
|
||||
% inc_counter(send_msg, 1),
|
||||
% inc_counter(outgoing_pubs, 1);
|
||||
% false ->
|
||||
% ok
|
||||
%end,
|
||||
%emqx_metrics:inc_sent(Packet).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
|
||||
-compile({inline, [next_msgs/1]}).
|
||||
next_msgs(Event) when is_tuple(Event) ->
|
||||
Event;
|
||||
next_msgs(More) when is_list(More) ->
|
||||
More.
|
||||
|
||||
-compile({inline, [shutdown/2, shutdown/3]}).
|
||||
shutdown(Reason, State) ->
|
||||
stop({shutdown, Reason}, State).
|
||||
|
||||
shutdown(Reason, Reply, State) ->
|
||||
stop({shutdown, Reason}, Reply, State).
|
||||
|
||||
-compile({inline, [stop/2, stop/3]}).
|
||||
stop(Reason, State) ->
|
||||
{stop, Reason, State}.
|
||||
|
||||
stop(Reason, Reply, State) ->
|
||||
{stop, Reason, Reply, State}.
|
||||
|
||||
inc_counter(Name, Value) ->
|
||||
_ = emqx_pd:inc_counter(Name, Value),
|
||||
ok.
|
|
@ -19,8 +19,10 @@
|
|||
|
||||
-include("src/mqttsn/include/emqx_sn.hrl").
|
||||
|
||||
-export([ parse/1
|
||||
, serialize/1
|
||||
-export([ initial_parse_state/1
|
||||
, serialize_opts/0
|
||||
, parse/2
|
||||
, serialize_pkt/2
|
||||
, message_type/1
|
||||
, format/1
|
||||
]).
|
||||
|
@ -29,17 +31,33 @@
|
|||
-define(byte, 8/big-integer).
|
||||
-define(short, 16/big-integer).
|
||||
|
||||
-type parse_state() :: #{}.
|
||||
-type serialize_opts() :: #{}.
|
||||
|
||||
-export_type([ parse_state/0
|
||||
, serialize_opts/0
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Initial
|
||||
|
||||
initial_parse_state(_) ->
|
||||
#{}.
|
||||
|
||||
serialize_opts() ->
|
||||
#{}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Parse MQTT-SN Message
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
parse(<<16#01:?byte, Len:?short, Type:?byte, Var/binary>>) ->
|
||||
parse(Type, Len - 4, Var);
|
||||
parse(<<Len:?byte, Type:?byte, Var/binary>>) ->
|
||||
parse(Type, Len - 2, Var).
|
||||
parse(<<16#01:?byte, Len:?short, Type:?byte, Var/binary>>, _State) ->
|
||||
{ok, parse(Type, Len - 4, Var), <<>>, _State};
|
||||
parse(<<Len:?byte, Type:?byte, Var/binary>>, _State) ->
|
||||
{ok, parse(Type, Len - 2, Var), <<>>, _State}.
|
||||
|
||||
parse(Type, Len, Var) when Len =:= size(Var) ->
|
||||
{ok, #mqtt_sn_message{type = Type, variable = parse_var(Type, Var)}};
|
||||
#mqtt_sn_message{type = Type, variable = parse_var(Type, Var)};
|
||||
parse(_Type, _Len, _Var) ->
|
||||
error(malformed_message_len).
|
||||
|
||||
|
@ -127,70 +145,70 @@ parse_topic(2#11, Topic) -> Topic.
|
|||
%% Serialize MQTT-SN Message
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
serialize(#mqtt_sn_message{type = Type, variable = Var}) ->
|
||||
VarBin = serialize(Type, Var), VarLen = size(VarBin),
|
||||
serialize_pkt(#mqtt_sn_message{type = Type, variable = Var}, Opts) ->
|
||||
VarBin = serialize(Type, Var, Opts), VarLen = size(VarBin),
|
||||
if
|
||||
VarLen < 254 -> <<(VarLen + 2), Type, VarBin/binary>>;
|
||||
true -> <<16#01, (VarLen + 4):?short, Type, VarBin/binary>>
|
||||
end.
|
||||
|
||||
serialize(?SN_ADVERTISE, {GwId, Duration}) ->
|
||||
serialize(?SN_ADVERTISE, {GwId, Duration}, _Opts) ->
|
||||
<<GwId, Duration:?short>>;
|
||||
serialize(?SN_SEARCHGW, Radius) ->
|
||||
serialize(?SN_SEARCHGW, Radius, _Opts) ->
|
||||
<<Radius>>;
|
||||
serialize(?SN_GWINFO, {GwId, GwAdd}) ->
|
||||
serialize(?SN_GWINFO, {GwId, GwAdd}, _Opts) ->
|
||||
<<GwId, GwAdd/binary>>;
|
||||
serialize(?SN_CONNECT, {Flags, ProtocolId, Duration, ClientId}) ->
|
||||
serialize(?SN_CONNECT, {Flags, ProtocolId, Duration, ClientId}, _Opts) ->
|
||||
<<(serialize_flags(Flags))/binary, ProtocolId, Duration:?short, ClientId/binary>>;
|
||||
serialize(?SN_CONNACK, ReturnCode) ->
|
||||
serialize(?SN_CONNACK, ReturnCode, _Opts) ->
|
||||
<<ReturnCode>>;
|
||||
serialize(?SN_WILLTOPICREQ, _) ->
|
||||
serialize(?SN_WILLTOPICREQ, _, _Opts) ->
|
||||
<<>>;
|
||||
serialize(?SN_WILLTOPIC, undefined) ->
|
||||
serialize(?SN_WILLTOPIC, undefined, _Opts) ->
|
||||
<<>>;
|
||||
serialize(?SN_WILLTOPIC, {Flags, Topic}) ->
|
||||
serialize(?SN_WILLTOPIC, {Flags, Topic}, _Opts) ->
|
||||
%% The WillTopic must a short topic name
|
||||
<<(serialize_flags(Flags))/binary, Topic/binary>>;
|
||||
serialize(?SN_WILLMSGREQ, _) ->
|
||||
serialize(?SN_WILLMSGREQ, _, _Opts) ->
|
||||
<<>>;
|
||||
serialize(?SN_WILLMSG, WillMsg) ->
|
||||
serialize(?SN_WILLMSG, WillMsg, _Opts) ->
|
||||
WillMsg;
|
||||
serialize(?SN_REGISTER, {TopicId, MsgId, TopicName}) ->
|
||||
serialize(?SN_REGISTER, {TopicId, MsgId, TopicName}, _Opts) ->
|
||||
<<TopicId:?short, MsgId:?short, TopicName/binary>>;
|
||||
serialize(?SN_REGACK, {TopicId, MsgId, ReturnCode}) ->
|
||||
serialize(?SN_REGACK, {TopicId, MsgId, ReturnCode}, _Opts) ->
|
||||
<<TopicId:?short, MsgId:?short, ReturnCode>>;
|
||||
serialize(?SN_PUBLISH, {Flags=#mqtt_sn_flags{topic_id_type = ?SN_NORMAL_TOPIC}, TopicId, MsgId, Data}) ->
|
||||
serialize(?SN_PUBLISH, {Flags=#mqtt_sn_flags{topic_id_type = ?SN_NORMAL_TOPIC}, TopicId, MsgId, Data}, _Opts) ->
|
||||
<<(serialize_flags(Flags))/binary, TopicId:?short, MsgId:?short, Data/binary>>;
|
||||
serialize(?SN_PUBLISH, {Flags=#mqtt_sn_flags{topic_id_type = ?SN_PREDEFINED_TOPIC}, TopicId, MsgId, Data}) ->
|
||||
serialize(?SN_PUBLISH, {Flags=#mqtt_sn_flags{topic_id_type = ?SN_PREDEFINED_TOPIC}, TopicId, MsgId, Data}, _Opts) ->
|
||||
<<(serialize_flags(Flags))/binary, TopicId:?short, MsgId:?short, Data/binary>>;
|
||||
serialize(?SN_PUBLISH, {Flags=#mqtt_sn_flags{topic_id_type = ?SN_SHORT_TOPIC}, STopicName, MsgId, Data}) ->
|
||||
serialize(?SN_PUBLISH, {Flags=#mqtt_sn_flags{topic_id_type = ?SN_SHORT_TOPIC}, STopicName, MsgId, Data}, _Opts) ->
|
||||
<<(serialize_flags(Flags))/binary, STopicName:2/binary, MsgId:?short, Data/binary>>;
|
||||
serialize(?SN_PUBACK, {TopicId, MsgId, ReturnCode}) ->
|
||||
serialize(?SN_PUBACK, {TopicId, MsgId, ReturnCode}, _Opts) ->
|
||||
<<TopicId:?short, MsgId:?short, ReturnCode>>;
|
||||
serialize(PubRec, MsgId) when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP ->
|
||||
serialize(PubRec, MsgId, _Opts) when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP ->
|
||||
<<MsgId:?short>>;
|
||||
serialize(Sub, {Flags = #mqtt_sn_flags{topic_id_type = IdType}, MsgId, Topic})
|
||||
serialize(Sub, {Flags = #mqtt_sn_flags{topic_id_type = IdType}, MsgId, Topic}, _Opts)
|
||||
when Sub == ?SN_SUBSCRIBE; Sub == ?SN_UNSUBSCRIBE ->
|
||||
<<(serialize_flags(Flags))/binary, MsgId:16, (serialize_topic(IdType, Topic))/binary>>;
|
||||
serialize(?SN_SUBACK, {Flags, TopicId, MsgId, ReturnCode}) ->
|
||||
serialize(?SN_SUBACK, {Flags, TopicId, MsgId, ReturnCode}, _Opts) ->
|
||||
<<(serialize_flags(Flags))/binary, TopicId:?short, MsgId:?short, ReturnCode>>;
|
||||
serialize(?SN_UNSUBACK, MsgId) ->
|
||||
serialize(?SN_UNSUBACK, MsgId, _Opts) ->
|
||||
<<MsgId:?short>>;
|
||||
serialize(?SN_PINGREQ, ClientId) ->
|
||||
serialize(?SN_PINGREQ, ClientId, _Opts) ->
|
||||
ClientId;
|
||||
serialize(?SN_PINGRESP, _) ->
|
||||
serialize(?SN_PINGRESP, _, _Opts) ->
|
||||
<<>>;
|
||||
serialize(?SN_WILLTOPICUPD, {Flags, WillTopic}) ->
|
||||
serialize(?SN_WILLTOPICUPD, {Flags, WillTopic}, _Opts) ->
|
||||
<<(serialize_flags(Flags))/binary, WillTopic/binary>>;
|
||||
serialize(?SN_WILLMSGUPD, WillMsg) ->
|
||||
serialize(?SN_WILLMSGUPD, WillMsg, _Opts) ->
|
||||
WillMsg;
|
||||
serialize(?SN_WILLTOPICRESP, ReturnCode) ->
|
||||
serialize(?SN_WILLTOPICRESP, ReturnCode, _Opts) ->
|
||||
<<ReturnCode>>;
|
||||
serialize(?SN_WILLMSGRESP, ReturnCode) ->
|
||||
serialize(?SN_WILLMSGRESP, ReturnCode, _Opts) ->
|
||||
<<ReturnCode>>;
|
||||
serialize(?SN_DISCONNECT, undefined) ->
|
||||
serialize(?SN_DISCONNECT, undefined, _Opts) ->
|
||||
<<>>;
|
||||
serialize(?SN_DISCONNECT, Duration) ->
|
||||
serialize(?SN_DISCONNECT, Duration, _Opts) ->
|
||||
<<Duration:?short>>.
|
||||
|
||||
serialize_flags(#mqtt_sn_flags{dup = Dup, qos = QoS, retain = Retain, will = Will,
|
||||
|
|
|
@ -129,7 +129,7 @@ start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
|
|||
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
|
||||
Name = name(InstaId, Type),
|
||||
esockd:open_udp(Name, ListenOn, merge_default(SocketOpts),
|
||||
{emqx_sn_gateway, start_link, [Cfg#{ctx => Ctx}]}).
|
||||
{emqx_sn_conn, start_link, [Cfg#{ctx => Ctx}]}).
|
||||
|
||||
name(InstaId, Type) ->
|
||||
list_to_atom(lists:concat([InstaId, ":", Type])).
|
||||
|
|
|
@ -49,14 +49,32 @@
|
|||
|
||||
-type(mqtt_sn_type() :: ?SN_ADVERTISE..?SN_WILLMSGRESP).
|
||||
|
||||
-define(SN_RC_ACCEPTED, 16#00).
|
||||
-define(SN_RC_ACCEPTED, 16#00).
|
||||
-define(SN_RC_CONGESTION, 16#01).
|
||||
-define(SN_RC_INVALID_TOPIC_ID, 16#02).
|
||||
-define(SN_RC_NOT_SUPPORTED, 16#03).
|
||||
%% Custome Reason code by emqx
|
||||
-define(SN_RC_NOT_AUTHORIZE, 16#04).
|
||||
-define(SN_RC_FAILED_SESSION, 16#05).
|
||||
-define(SN_EXCEED_LIMITATION, 16#06).
|
||||
|
||||
-define(SN_RC_NAME(Rc),
|
||||
(begin
|
||||
case Rc of
|
||||
?SN_RC_ACCEPTED -> accepted;
|
||||
?SN_RC_CONGESTION -> rejected_congestion;
|
||||
?SN_RC_INVALID_TOPIC_ID -> rejected_invaild_topic_id;
|
||||
?SN_RC_NOT_SUPPORTED -> rejected_not_supported;
|
||||
?SN_RC_NOT_AUTHORIZE -> rejected_not_authorize;
|
||||
?SN_RC_FAILED_SESSION -> rejected_failed_open_session;
|
||||
?SN_EXCEED_LIMITATION -> rejected_exceed_limitation;
|
||||
_ -> reserved
|
||||
end
|
||||
end)).
|
||||
|
||||
-define(QOS_NEG1, 3).
|
||||
|
||||
-type(mqtt_sn_return_code() :: ?SN_RC_ACCEPTED .. ?SN_RC_NOT_SUPPORTED).
|
||||
-type(mqtt_sn_return_code() :: ?SN_RC_ACCEPTED .. ?SN_EXCEED_LIMITATION).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT-SN Message
|
||||
|
@ -139,6 +157,12 @@
|
|||
#mqtt_sn_message{type = ?SN_SUBSCRIBE,
|
||||
variable = {Flags, MsgId, Topic}}).
|
||||
|
||||
-define(SN_SUBSCRIBE_MSG_TYPE(Type, Topic, QoS),
|
||||
#mqtt_sn_message{type = ?SN_SUBSCRIBE,
|
||||
variable = {
|
||||
#mqtt_sn_flags{qos = QoS, topic_id_type = Type},
|
||||
_, Topic}}).
|
||||
|
||||
-define(SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode),
|
||||
#mqtt_sn_message{type = ?SN_SUBACK,
|
||||
variable = {Flags, TopicId, MsgId, ReturnCode}}).
|
||||
|
@ -147,6 +171,12 @@
|
|||
#mqtt_sn_message{type = ?SN_UNSUBSCRIBE,
|
||||
variable = {Flags, MsgId, Topic}}).
|
||||
|
||||
-define(SN_UNSUBSCRIBE_MSG_TYPE(Type, Topic),
|
||||
#mqtt_sn_message{type = ?SN_UNSUBSCRIBE,
|
||||
variable = {
|
||||
#mqtt_sn_flags{topic_id_type = Type},
|
||||
_, Topic}}).
|
||||
|
||||
-define(SN_UNSUBACK_MSG(MsgId),
|
||||
#mqtt_sn_message{type = ?SN_UNSUBACK,
|
||||
variable = MsgId}).
|
||||
|
@ -181,5 +211,4 @@
|
|||
-define(SN_SHORT_TOPIC, 2).
|
||||
-define(SN_RESERVED_TOPIC, 3).
|
||||
|
||||
|
||||
-define(SN_INVALID_TOPIC_ID, 0).
|
||||
|
|
Loading…
Reference in New Issue