style(emqx_gateway): improve some emqx_coap code

This commit is contained in:
lafirest 2021-08-02 16:55:57 +08:00 committed by JianBo He
parent ccf9dd18fb
commit 149ef6d7cc
8 changed files with 179 additions and 182 deletions

View File

@ -36,7 +36,7 @@ gateway: {
subscribe_qos: qos0 subscribe_qos: qos0
publish_qos: qos1 publish_qos: qos1
listener.udp.1: { listener.udp.1: {
bind: 5687 bind: 5683
} }
} }
@ -49,7 +49,7 @@ gateway: {
subscribe_qos: qos2 subscribe_qos: qos2
publish_qos: coap publish_qos: coap
listener.udp.1: { listener.udp.1: {
bind: 5683 bind: 5687
} }
} }

View File

@ -67,9 +67,9 @@
-define(DISCONNECT_WAIT_TIME, timer:seconds(10)). -define(DISCONNECT_WAIT_TIME, timer:seconds(10)).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
%%%=================================================================== %%--------------------------------------------------------------------
%%% API %% API
%%%=================================================================== %%--------------------------------------------------------------------
info(Channel) -> info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)). maps:from_list(info(?INFO_KEYS, Channel)).
@ -101,7 +101,7 @@ init(ConnInfo = #{peername := {PeerHost, _},
ClientInfo = set_peercert_infos( ClientInfo = set_peercert_infos(
Peercert, Peercert,
#{ zone => default #{ zone => default
, protocol => 'mqtt-coap' , protocol => 'coap'
, peerhost => PeerHost , peerhost => PeerHost
, sockport => SockPort , sockport => SockPort
, clientid => emqx_guid:to_base62(emqx_guid:gen()) , clientid => emqx_guid:to_base62(emqx_guid:gen())
@ -132,8 +132,8 @@ auth_subscribe(Topic,
clientinfo := ClientInfo}) -> clientinfo := ClientInfo}) ->
emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic). emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic).
transfer_result(Result, From, Value) -> transfer_result(From, Value, Result) ->
?TRANSFER_RESULT(Result, [out], From, Value). ?TRANSFER_RESULT([out], From, Value, Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle incoming packet %% Handle incoming packet
@ -147,17 +147,17 @@ handle_in(#coap_message{method = post,
<<>> -> <<>> ->
handle_command(Msg, Channel); handle_command(Msg, Channel);
_ -> _ ->
call_session(Channel, received, [Msg]) call_session(received, [Msg], Channel)
end; end;
handle_in(Msg, Channel) -> handle_in(Msg, Channel) ->
call_session(ensure_keepalive_timer(Channel), received, [Msg]). call_session(received, [Msg], ensure_keepalive_timer(Channel)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle Delivers from broker to client %% Handle Delivers from broker to client
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_deliver(Delivers, Channel) -> handle_deliver(Delivers, Channel) ->
call_session(Channel, deliver, [Delivers]). call_session(deliver, [Delivers], Channel).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle timeout %% Handle timeout
@ -165,14 +165,14 @@ handle_deliver(Delivers, Channel) ->
handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel) -> handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel) ->
case emqx_keepalive:check(NewVal, KeepAlive) of case emqx_keepalive:check(NewVal, KeepAlive) of
{ok, NewKeepAlive} -> {ok, NewKeepAlive} ->
Channel2 = ensure_keepalive_timer(Channel, fun make_timer/4), Channel2 = ensure_keepalive_timer(fun make_timer/4, Channel),
{ok, Channel2#channel{keepalive = NewKeepAlive}}; {ok, Channel2#channel{keepalive = NewKeepAlive}};
{error, timeout} -> {error, timeout} ->
{shutdown, timeout, Channel} {shutdown, timeout, Channel}
end; end;
handle_timeout(_, {transport, Msg}, Channel) -> handle_timeout(_, {transport, Msg}, Channel) ->
call_session(Channel, timeout, [Msg]); call_session(timeout, [Msg], Channel);
handle_timeout(_, disconnect, Channel) -> handle_timeout(_, disconnect, Channel) ->
{shutdown, normal, Channel}; {shutdown, normal, Channel};
@ -207,9 +207,9 @@ handle_info(Info, Channel) ->
terminate(_Reason, _Channel) -> terminate(_Reason, _Channel) ->
ok. ok.
%%%=================================================================== %%--------------------------------------------------------------------
%%% Internal functions %% Internal functions
%%%=================================================================== %%--------------------------------------------------------------------
set_peercert_infos(NoSSL, ClientInfo) set_peercert_infos(NoSSL, ClientInfo)
when NoSSL =:= nossl; when NoSSL =:= nossl;
NoSSL =:= undefined -> NoSSL =:= undefined ->
@ -232,9 +232,9 @@ make_timer(Name, Time, Msg, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = Timers#{Name => TRef}}. Channel#channel{timers = Timers#{Name => TRef}}.
ensure_keepalive_timer(Channel) -> ensure_keepalive_timer(Channel) ->
ensure_keepalive_timer(Channel, fun ensure_timer/4). ensure_keepalive_timer(fun ensure_timer/4, Channel).
ensure_keepalive_timer(#channel{config = Cfg} = Channel, Fun) -> ensure_keepalive_timer(Fun, #channel{config = Cfg} = Channel) ->
Interval = maps:get(heartbeat, Cfg), Interval = maps:get(heartbeat, Cfg),
Fun(keepalive, Interval, keepalive, Channel). Fun(keepalive, Interval, keepalive, Channel).
@ -285,9 +285,9 @@ run_conn_hooks(Input, Channel = #channel{ctx = Ctx,
conninfo = ConnInfo}) -> conninfo = ConnInfo}) ->
ConnProps = #{}, ConnProps = #{},
case run_hooks(Ctx, 'client.connect', [ConnInfo], ConnProps) of case run_hooks(Ctx, 'client.connect', [ConnInfo], ConnProps) of
Error = {error, _Reason} -> Error; Error = {error, _Reason} -> Error;
_NConnProps -> _NConnProps ->
{ok, Input, Channel} {ok, Input, Channel}
end. end.
enrich_clientinfo({Queries, Msg}, enrich_clientinfo({Queries, Msg},
@ -339,11 +339,10 @@ ensure_connected(Channel = #channel{ctx = Ctx,
Channel#channel{conninfo = NConnInfo}. Channel#channel{conninfo = NConnInfo}.
process_connect(Channel = #channel{ctx = Ctx, process_connect(Channel = #channel{ctx = Ctx,
session = Session,
conninfo = ConnInfo, conninfo = ConnInfo,
clientinfo = ClientInfo}, clientinfo = ClientInfo},
Msg) -> Msg) ->
SessFun = fun(_,_) -> Session end, SessFun = fun(_,_) -> emqx_coap_session:new() end,
case emqx_gateway_ctx:open_session( case emqx_gateway_ctx:open_session(
Ctx, Ctx,
true, true,
@ -367,14 +366,16 @@ run_hooks(Ctx, Name, Args, Acc) ->
emqx_hooks:run_fold(Name, Args, Acc). emqx_hooks:run_fold(Name, Args, Acc).
reply(Channel, Method, Payload, Req) -> reply(Channel, Method, Payload, Req) ->
call_session(Channel, reply, [Req, Method, Payload]). call_session(reply, [Req, Method, Payload], Channel).
ack(Channel, Method, Payload, Req) -> ack(Channel, Method, Payload, Req) ->
call_session(Channel, piggyback, [Req, Method, Payload]). call_session(piggyback, [Req, Method, Payload], Channel).
call_session(#channel{session = Session, call_session(F,
config = Cfg} = Channel, F, A) -> A,
case erlang:apply(emqx_coap_session, F, [Session, Cfg | A]) of #channel{session = Session,
config = Cfg} = Channel) ->
case erlang:apply(emqx_coap_session, F, A ++ [Cfg, Session]) of
#{out := Out, #{out := Out,
session := Session2} -> session := Session2} ->
{ok, {outgoing, Out}, Channel#channel{session = Session2}}; {ok, {outgoing, Out}, Channel#channel{session = Session2}};

View File

@ -54,9 +54,9 @@
-define(OPTION_PROXY_SCHEME, 39). -define(OPTION_PROXY_SCHEME, 39).
-define(OPTION_SIZE1, 60). -define(OPTION_SIZE1, 60).
%%%=================================================================== %%--------------------------------------------------------------------
%%% API %% API
%%%=================================================================== %%--------------------------------------------------------------------
initial_parse_state(_) -> initial_parse_state(_) ->
#{}. #{}.
@ -64,9 +64,9 @@ initial_parse_state(_) ->
serialize_opts() -> serialize_opts() ->
#{}. #{}.
%%%=================================================================== %%--------------------------------------------------------------------
%%% serialize_pkt %% serialize_pkt
%%%=================================================================== %%--------------------------------------------------------------------
%% empty message %% empty message
serialize_pkt(#coap_message{type = Type, method = undefined, id = MsgId}, _Opts) -> serialize_pkt(#coap_message{type = Type, method = undefined, id = MsgId}, _Opts) ->
<<?VERSION:2, (encode_type(Type)):2, 0:4, 0:3, 0:5, MsgId:16>>; <<?VERSION:2, (encode_type(Type)):2, 0:4, 0:3, 0:5, MsgId:16>>;
@ -223,9 +223,9 @@ method_to_class_code({error, proxying_not_supported}) -> {5, 05};
method_to_class_code(Method) -> method_to_class_code(Method) ->
erlang:throw({bad_method, Method}). erlang:throw({bad_method, Method}).
%%%=================================================================== %%--------------------------------------------------------------------
%%% parse %% parse
%%%=================================================================== %%--------------------------------------------------------------------
parse(<<?VERSION:2, Type:2, 0:4, 0:3, 0:5, MsgId:16>>, ParseState) -> parse(<<?VERSION:2, Type:2, 0:4, 0:3, 0:5, MsgId:16>>, ParseState) ->
{ok, {ok,
#coap_message{ type = decode_type(Type) #coap_message{ type = decode_type(Type)
@ -410,9 +410,9 @@ is_message(#coap_message{}) ->
is_message(_) -> is_message(_) ->
false. false.
%%%=================================================================== %%--------------------------------------------------------------------
%%% Internal functions %% Internal functions
%%%=================================================================== %%--------------------------------------------------------------------
-spec is_repeatable_option(message_option_name()) -> boolean(). -spec is_repeatable_option(message_option_name()) -> boolean().
is_repeatable_option(if_match) -> true; is_repeatable_option(if_match) -> true;
is_repeatable_option(etag) -> true; is_repeatable_option(etag) -> true;

View File

@ -17,7 +17,7 @@
-module(emqx_coap_observe_res). -module(emqx_coap_observe_res).
%% API %% API
-export([ new/0, insert/3, remove/2 -export([ new_manager/0, insert/3, remove/2
, res_changed/2, foreach/2]). , res_changed/2, foreach/2]).
-export_type([manager/0]). -export_type([manager/0]).
@ -26,6 +26,7 @@
-type topic() :: binary(). -type topic() :: binary().
-type token() :: binary(). -type token() :: binary().
-type seq_id() :: 0 .. ?MAX_SEQ_ID. -type seq_id() :: 0 .. ?MAX_SEQ_ID.
-type res() :: #{ token := token() -type res() :: #{ token := token()
, seq_id := seq_id() , seq_id := seq_id()
}. }.
@ -35,12 +36,12 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec new() -> manager(). -spec new_manager() -> manager().
new() -> new_manager() ->
#{}. #{}.
-spec insert(manager(), topic(), token()) -> manager(). -spec insert(topic(), token(), manager()) -> manager().
insert(Manager, Topic, Token) -> insert(Topic, Token, Manager) ->
case maps:get(Topic, Manager, undefined) of case maps:get(Topic, Manager, undefined) of
undefined -> undefined ->
Manager#{Topic => new_res(Token)}; Manager#{Topic => new_res(Token)};
@ -48,12 +49,12 @@ insert(Manager, Topic, Token) ->
Manager Manager
end. end.
-spec remove(manager(), topic()) -> manager(). -spec remove(topic(), manager()) -> manager().
remove(Manager, Topic) -> remove(Topic, Manager) ->
maps:remove(Topic, Manager). maps:remove(Topic, Manager).
-spec res_changed(manager(), topic()) -> undefined | {token(), seq_id(), manager()}. -spec res_changed(topic(), manager()) -> undefined | {token(), seq_id(), manager()}.
res_changed(Manager, Topic) -> res_changed(Topic, Manager) ->
case maps:get(Topic, Manager, undefined) of case maps:get(Topic, Manager, undefined) of
undefined -> undefined ->
undefined; undefined;

View File

@ -47,52 +47,52 @@
new() -> new() ->
_ = emqx_misc:rand_seed(), _ = emqx_misc:rand_seed(),
#session{ transport_manager = emqx_coap_tm:new() #session{ transport_manager = emqx_coap_tm:new()
, observe_manager = emqx_coap_observe_res:new() , observe_manager = emqx_coap_observe_res:new_manager()
, next_msg_id = rand:uniform(?MAX_MESSAGE_ID)}. , next_msg_id = rand:uniform(?MAX_MESSAGE_ID)}.
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
%%% Process Message %%% Process Message
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
received(Session, Cfg, #coap_message{type = ack} = Msg) -> received(#coap_message{type = ack} = Msg, Cfg, Session) ->
handle_response(Session, Cfg, Msg); handle_response(Msg, Cfg, Session);
received(Session, Cfg, #coap_message{type = reset} = Msg) -> received(#coap_message{type = reset} = Msg, Cfg, Session) ->
handle_response(Session, Cfg, Msg); handle_response(Msg, Cfg, Session);
received(Session, Cfg, #coap_message{method = Method} = Msg) when is_atom(Method) -> received(#coap_message{method = Method} = Msg, Cfg, Session) when is_atom(Method) ->
handle_request(Session, Cfg, Msg); handle_request(Msg, Cfg, Session);
received(Session, Cfg, Msg) -> received(Msg, Cfg, Session) ->
handle_response(Session, Cfg, Msg). handle_response(Msg, Cfg, Session).
reply(Session, Cfg, Req, Method) -> reply(Req, Method, Cfg, Session) ->
reply(Session, Cfg, Req, Method, <<>>). reply(Req, Method, <<>>, Cfg, Session).
reply(Session, Cfg, Req, Method, Payload) -> reply(Req, Method, Payload, Cfg, Session) ->
Response = emqx_coap_message:response(Method, Payload, Req), Response = emqx_coap_message:response(Method, Payload, Req),
handle_out(Session, Cfg, Response). handle_out(Response, Cfg, Session).
ack(Session, Cfg, Req) -> ack(Req, Cfg, Session) ->
piggyback(Session, Cfg, Req, <<>>). piggyback(Req, <<>>, Cfg, Session).
piggyback(Session, Cfg, Req, Payload) -> piggyback(Req, Payload, Cfg, Session) ->
Response = emqx_coap_message:ack(Req), Response = emqx_coap_message:ack(Req),
Response2 = emqx_coap_message:set_payload(Payload, Response), Response2 = emqx_coap_message:set_payload(Payload, Response),
handle_out(Session, Cfg, Response2). handle_out(Response2, Cfg, Session).
deliver(Session, Cfg, Delivers) -> deliver(Delivers, Cfg, Session) ->
Fun = fun({_, Topic, Message}, Fun = fun({_, Topic, Message},
#{out := OutAcc, #{out := OutAcc,
session := #session{observe_manager = OM, session := #session{observe_manager = OM,
next_msg_id = MsgId} = SAcc} = Acc) -> next_msg_id = MsgId} = SAcc} = Acc) ->
case emqx_coap_observe_res:res_changed(OM, Topic) of case emqx_coap_observe_res:res_changed(Topic, OM) of
undefined -> undefined ->
Acc; Acc;
{Token, SeqId, OM2} -> {Token, SeqId, OM2} ->
Msg = mqtt_to_coap(Message, MsgId, Token, SeqId, Cfg), Msg = mqtt_to_coap(Message, MsgId, Token, SeqId, Cfg),
SAcc2 = SAcc#session{next_msg_id = next_msg_id(MsgId), SAcc2 = SAcc#session{next_msg_id = next_msg_id(MsgId),
observe_manager = OM2}, observe_manager = OM2},
#{out := Out} = Result = call_transport_manager(SAcc2, Cfg, Msg, handle_out), #{out := Out} = Result = call_transport_manager(handle_out, Msg, Cfg, SAcc2),
Result#{out := [Out | OutAcc]} Result#{out := [Out | OutAcc]}
end end
end, end,
@ -101,35 +101,35 @@ deliver(Session, Cfg, Delivers) ->
session => Session}, session => Session},
Delivers). Delivers).
timeout(Session, Cfg, Timer) -> timeout(Timer, Cfg, Session) ->
call_transport_manager(Session, Cfg, Timer, ?FUNCTION_NAME). call_transport_manager(?FUNCTION_NAME, Timer, Cfg, Session).
transfer_result(Result, From, Value) -> transfer_result(From, Value, Result) ->
?TRANSFER_RESULT(Result, [out, subscribe], From, Value). ?TRANSFER_RESULT([out, subscribe], From, Value, Result).
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
%%% Internal functions %%% Internal functions
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
handle_request(Session, Cfg, Msg) -> handle_request(Msg, Cfg, Session) ->
call_transport_manager(Session, Cfg, Msg, ?FUNCTION_NAME). call_transport_manager(?FUNCTION_NAME, Msg, Cfg, Session).
handle_response(Session, Cfg, Msg) -> handle_response(Msg, Cfg, Session) ->
call_transport_manager(Session, Cfg, Msg, ?FUNCTION_NAME). call_transport_manager(?FUNCTION_NAME, Msg, Cfg, Session).
handle_out(Session, Cfg, Msg) -> handle_out(Msg, Cfg, Session) ->
call_transport_manager(Session, Cfg, Msg, ?FUNCTION_NAME). call_transport_manager(?FUNCTION_NAME, Msg, Cfg, Session).
call_transport_manager(#session{transport_manager = TM} = Session, call_transport_manager(Fun,
Cfg,
Msg, Msg,
Fun) -> Cfg,
#session{transport_manager = TM} = Session) ->
try try
Result = emqx_coap_tm:Fun(Msg, TM, Cfg), Result = emqx_coap_tm:Fun(Msg, Cfg, TM),
{ok, _, Session2} = emqx_misc:pipeline([fun process_tm/2, {ok, _, Session2} = emqx_misc:pipeline([fun process_tm/2,
fun process_subscribe/2], fun process_subscribe/2],
Result, Result,
Session), Session),
emqx_coap_channel:transfer_result(Result, session, Session2) emqx_coap_channel:transfer_result(session, Session2, Result)
catch Type:Reason:Stack -> catch Type:Reason:Stack ->
?ERROR("process transmission with, message:~p failed~n ?ERROR("process transmission with, message:~p failed~n
Type:~p,Reason:~p~n,StackTrace:~p~n", [Msg, Type, Reason, Stack]), Type:~p,Reason:~p~n,StackTrace:~p~n", [Msg, Type, Reason, Stack]),
@ -146,10 +146,10 @@ process_subscribe(#{subscribe := Sub}, #session{observe_manager = OM} = Session
undefined -> undefined ->
{ok, Session}; {ok, Session};
{Topic, Token} -> {Topic, Token} ->
OM2 = emqx_coap_observe_res:insert(OM, Topic, Token), OM2 = emqx_coap_observe_res:insert(Topic, Token, OM),
{ok, Session#session{observe_manager = OM2}}; {ok, Session#session{observe_manager = OM2}};
Topic -> Topic ->
OM2 = emqx_coap_observe_res:remove(OM, Topic), OM2 = emqx_coap_observe_res:remove(Topic, OM),
{ok, Session#session{observe_manager = OM2}} {ok, Session#session{observe_manager = OM2}}
end; end;
process_subscribe(_, Session) -> process_subscribe(_, Session) ->

View File

@ -14,7 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% transport manager %% the transport state machine manager
-module(emqx_coap_tm). -module(emqx_coap_tm).
-export([ new/0 -export([ new/0
@ -23,23 +23,23 @@
, handle_out/3 , handle_out/3
, timeout/3]). , timeout/3]).
-export_type([manager/0, event_result/2]). -export_type([manager/0, event_result/1]).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). -include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-type direction() :: in | out. -type direction() :: in | out.
-type transport_id() :: {direction(), non_neg_integer()}. -type state_machine_id() :: {direction(), non_neg_integer()}.
-record(transport, { id :: transport_id() -record(state_machine, { id :: state_machine_id()
, state :: atom() , state :: atom()
, timers :: maps:map() , timers :: maps:map()
, data :: any()}). , transport :: emqx_coap_transport:transport()}).
-type transport() :: #transport{}. -type state_machine() :: #state_machine{}.
-type message_id() :: 0 .. ?MAX_MESSAGE_ID. -type message_id() :: 0 .. ?MAX_MESSAGE_ID.
-type manager() :: #{message_id() => transport()}. -type manager() :: #{message_id() => state_machine()}.
-type ttimeout() :: {state_timeout, pos_integer(), any()} -type ttimeout() :: {state_timeout, pos_integer(), any()}
| {stop_timeout, pos_integer()}. | {stop_timeout, pos_integer()}.
@ -47,32 +47,31 @@
-type topic() :: binary(). -type topic() :: binary().
-type token() :: binary(). -type token() :: binary().
-type sub_register() :: {topic(), token()} | topic(). -type sub_register() :: {topic(), token()} | topic().
-type event_result(State) ::
-type event_result(State, Data) ::
#{next => State, #{next => State,
outgoing => emqx_coap_message(), outgoing => emqx_coap_message(),
timeouts => list(ttimeout()), timeouts => list(ttimeout()),
has_sub => undefined | sub_register(), has_sub => undefined | sub_register(),
data => Data}. transport => emqx_coap_transport:transprot()}.
%%%=================================================================== %%--------------------------------------------------------------------
%%% API %% API
%%%=================================================================== %%--------------------------------------------------------------------
new() -> new() ->
#{}. #{}.
handle_request(#coap_message{id = MsgId} = Msg, TM, Cfg) -> handle_request(#coap_message{id = MsgId} = Msg, Cfg, TM) ->
Id = {in, MsgId}, Id = {in, MsgId},
case maps:get(Id, TM, undefined) of case maps:get(Id, TM, undefined) of
undefined -> undefined ->
Data = emqx_coap_transport:new(), Transport = emqx_coap_transport:new(),
Transport = new_transport(Id, Data), Machine = new_state_machine(Id, Transport),
process_event(in, Msg, TM, Transport, Cfg); process_event(in, Msg, TM, Machine, Cfg);
TP -> Machine ->
process_event(in, Msg, TM, TP, Cfg) process_event(in, Msg, TM, Machine, Cfg)
end. end.
handle_response(#coap_message{type = Type, id = MsgId} = Msg, TM, Cfg) -> handle_response(#coap_message{type = Type, id = MsgId} = Msg, Cfg, TM) ->
Id = {out, MsgId}, Id = {out, MsgId},
case maps:get(Id, TM, undefined) of case maps:get(Id, TM, undefined) of
undefined -> undefined ->
@ -83,56 +82,50 @@ handle_response(#coap_message{type = Type, id = MsgId} = Msg, TM, Cfg) ->
#{out => #coap_message{type = reset, #{out => #coap_message{type = reset,
id = MsgId}} id = MsgId}}
end; end;
TP -> Machine ->
process_event(in, Msg, TM, TP, Cfg) process_event(in, Msg, TM, Machine, Cfg)
end. end.
handle_out(#coap_message{id = MsgId} = Msg, TM, Cfg) -> handle_out(#coap_message{id = MsgId} = Msg, Cfg, TM) ->
Id = {out, MsgId}, Id = {out, MsgId},
case maps:get(Id, TM, undefined) of case maps:get(Id, TM, undefined) of
undefined -> undefined ->
Data = emqx_coap_transport:new(), Transport = emqx_coap_transport:new(),
Transport = new_transport(Id, Data), Machine = new_state_machine(Id, Transport),
process_event(out, Msg, TM, Transport, Cfg); process_event(out, Msg, TM, Machine, Cfg);
_ -> _ ->
?WARN("Repeat sending message with id:~p~n", [Id]), ?WARN("Repeat sending message with id:~p~n", [Id]),
?EMPTY_RESULT ?EMPTY_RESULT
end. end.
timeout({Id, Type, Msg}, TM, Cfg) -> timeout({Id, Type, Msg}, Cfg, TM) ->
case maps:get(Id, TM, undefined) of case maps:get(Id, TM, undefined) of
undefined -> undefined ->
?EMPTY_RESULT; ?EMPTY_RESULT;
#transport{timers = Timers} = TP -> #state_machine{timers = Timers} = Machine ->
%% maybe timer has been canceled %% maybe timer has been canceled
case maps:is_key(Type, Timers) of case maps:is_key(Type, Timers) of
true -> true ->
process_event(Type, Msg, TM, TP, Cfg); process_event(Type, Msg, TM, Machine, Cfg);
_ -> _ ->
?EMPTY_RESULT ?EMPTY_RESULT
end end
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% Internal functions
%% @spec
%% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
new_state_machine(Id, Transport) ->
%%%=================================================================== #state_machine{id = Id,
%%% Internal functions state = idle,
%%%=================================================================== timers = #{},
new_transport(Id, Data) -> transport = Transport}.
#transport{id = Id,
state = idle,
timers = #{},
data = Data}.
process_event(stop_timeout, process_event(stop_timeout,
_, _,
TM, TM,
#transport{id = Id, #state_machine{id = Id,
timers = Timers}, timers = Timers},
_) -> _) ->
lists:foreach(fun({_, Ref}) -> lists:foreach(fun({_, Ref}) ->
emqx_misc:cancel_timer(Ref) emqx_misc:cancel_timer(Ref)
@ -143,42 +136,42 @@ process_event(stop_timeout,
process_event(Event, process_event(Event,
Msg, Msg,
TM, TM,
#transport{id = Id, #state_machine{id = Id,
state = State, state = State,
data = Data} = TP, transport = Transport} = Machine,
Cfg) -> Cfg) ->
Result = emqx_coap_transport:State(Event, Msg, Data, Cfg), Result = emqx_coap_transport:State(Event, Msg, Transport, Cfg),
{ok, _, TP2} = emqx_misc:pipeline([fun process_state_change/2, {ok, _, Machine2} = emqx_misc:pipeline([fun process_state_change/2,
fun process_data_change/2, fun process_transport_change/2,
fun process_timeouts/2], fun process_timeouts/2],
Result, Result,
TP), Machine),
TM2 = TM#{Id => TP2}, TM2 = TM#{Id => Machine2},
emqx_coap_session:transfer_result(Result, tm, TM2). emqx_coap_session:transfer_result(tm, TM2, Result).
process_state_change(#{next := Next}, TP) -> process_state_change(#{next := Next}, Machine) ->
{ok, cancel_state_timer(TP#transport{state = Next})}; {ok, cancel_state_timer(Machine#state_machine{state = Next})};
process_state_change(_, TP) -> process_state_change(_, Machine) ->
{ok, TP}. {ok, Machine}.
cancel_state_timer(#transport{timers = Timers} = TP) -> cancel_state_timer(#state_machine{timers = Timers} = Machine) ->
case maps:get(state_timer, Timers, undefined) of case maps:get(state_timer, Timers, undefined) of
undefined -> undefined ->
TP; Machine;
Ref -> Ref ->
_ = emqx_misc:cancel_timer(Ref), _ = emqx_misc:cancel_timer(Ref),
TP#transport{timers = maps:remove(state_timer, Timers)} Machine#state_machine{timers = maps:remove(state_timer, Timers)}
end. end.
process_data_change(#{data := Data}, TP) -> process_transport_change(#{transport := Transport}, Machine) ->
{ok, TP#transport{data = Data}}; {ok, Machine#state_machine{transport = Transport}};
process_data_change(_, TP) -> process_transport_change(_, Machine) ->
{ok, TP}. {ok, Machine}.
process_timeouts(#{timeouts := []}, TP) -> process_timeouts(#{timeouts := []}, Machine) ->
{ok, TP}; {ok, Machine};
process_timeouts(#{timeouts := Timeouts}, process_timeouts(#{timeouts := Timeouts},
#transport{id = Id, timers = Timers} = TP) -> #state_machine{id = Id, timers = Timers} = Machine) ->
NewTimers = lists:foldl(fun({state_timeout, _, _} = Timer, Acc) -> NewTimers = lists:foldl(fun({state_timeout, _, _} = Timer, Acc) ->
process_timer(Id, Timer, Acc); process_timer(Id, Timer, Acc);
({stop_timeout, I}, Acc) -> ({stop_timeout, I}, Acc) ->
@ -186,11 +179,11 @@ process_timeouts(#{timeouts := Timeouts},
end, end,
Timers, Timers,
Timeouts), Timeouts),
{ok, TP#transport{timers = NewTimers}}; {ok, Machine#state_machine{timers = NewTimers}};
process_timeouts(_, TP) -> process_timeouts(_, Machine) ->
{ok, TP}. {ok, Machine}.
process_timer(Id, {Type, Interval, Msg}, Timers) -> process_timer(Id, {Type, Interval, Msg}, Timers) ->
Ref = emqx_misc:start_timer(Interval, {transport, {Id, Type, Msg}}), Ref = emqx_misc:start_timer(Interval, {state_machine, {Id, Type, Msg}}),
Timers#{Type => Ref}. Timers#{Type => Ref}.

View File

@ -9,21 +9,23 @@
-define(EXCHANGE_LIFETIME, 247000). -define(EXCHANGE_LIFETIME, 247000).
-define(NON_LIFETIME, 145000). -define(NON_LIFETIME, 145000).
-record(data, { cache :: undefined | emqx_coap_message() -record(transport, { cache :: undefined | emqx_coap_message()
, retry_interval :: non_neg_integer() , retry_interval :: non_neg_integer()
, retry_count :: non_neg_integer() , retry_count :: non_neg_integer()
}). }).
-type data() :: #data{}. -type transport() :: #transport{}.
-export([ new/0, idle/4, maybe_reset/4 -export([ new/0, idle/4, maybe_reset/4
, maybe_resend/4, wait_ack/4, until_stop/4]). , maybe_resend/4, wait_ack/4, until_stop/4]).
-spec new() -> data(). -export_type([transport/0]).
-spec new() -> transport().
new() -> new() ->
#data{cache = undefined, #transport{cache = undefined,
retry_interval = 0, retry_interval = 0,
retry_count = 0}. retry_count = 0}.
idle(in, idle(in,
#coap_message{type = non, id = MsgId, method = Method} = Msg, #coap_message{type = non, id = MsgId, method = Method} = Msg,
@ -50,14 +52,14 @@ idle(in,
#coap_message{id = MsgId, #coap_message{id = MsgId,
type = con, type = con,
method = Method} = Msg, method = Method} = Msg,
Data, Transport,
#{resource := Resource} = Cfg) -> #{resource := Resource} = Cfg) ->
Ret = #{next => maybe_resend, Ret = #{next => maybe_resend,
timeouts =>[{stop_timeout, ?EXCHANGE_LIFETIME}]}, timeouts =>[{stop_timeout, ?EXCHANGE_LIFETIME}]},
case Method of case Method of
undefined -> undefined ->
ResetMsg = #coap_message{type = reset, id = MsgId}, ResetMsg = #coap_message{type = reset, id = MsgId},
Ret#{data => Data#data{cache = ResetMsg}, Ret#{transport => Transport#transport{cache = ResetMsg},
out => ResetMsg}; out => ResetMsg};
_ -> _ ->
{RetMsg, SubInfo} = {RetMsg, SubInfo} =
@ -72,7 +74,7 @@ idle(in,
end, end,
RetMsg2 = RetMsg#coap_message{type = ack}, RetMsg2 = RetMsg#coap_message{type = ack},
Ret#{out => RetMsg2, Ret#{out => RetMsg2,
data => Data#data{cache = RetMsg2}, transport => Transport#transport{cache = RetMsg2},
subscribe => SubInfo} subscribe => SubInfo}
end; end;
@ -81,11 +83,11 @@ idle(out, #coap_message{type = non} = Msg, _, _) ->
out => Msg, out => Msg,
timeouts => [{stop_timeout, ?NON_LIFETIME}]}; timeouts => [{stop_timeout, ?NON_LIFETIME}]};
idle(out, Msg, Data, _) -> idle(out, Msg, Transport, _) ->
_ = emqx_misc:rand_seed(), _ = emqx_misc:rand_seed(),
Timeout = ?ACK_TIMEOUT + rand:uniform(?ACK_RANDOM_FACTOR), Timeout = ?ACK_TIMEOUT + rand:uniform(?ACK_RANDOM_FACTOR),
#{next => wait_ack, #{next => wait_ack,
data => Data#data{cache = Msg}, transport => Transport#transport{cache = Msg},
out => Msg, out => Msg,
timeouts => [ {state_timeout, Timeout, ack_timeout} timeouts => [ {state_timeout, Timeout, ack_timeout}
, {stop_timeout, ?EXCHANGE_LIFETIME}]}. , {stop_timeout, ?EXCHANGE_LIFETIME}]}.
@ -99,7 +101,7 @@ maybe_reset(in, Message, _, _) ->
end, end,
?EMPTY_RESULT. ?EMPTY_RESULT.
maybe_resend(in, _, _, #data{cache = Cache}) -> maybe_resend(in, _, _, #transport{cache = Cache}) ->
#{out => Cache}. #{out => Cache}.
wait_ack(in, #coap_message{type = Type}, _, _) -> wait_ack(in, #coap_message{type = Type}, _, _) ->
@ -115,14 +117,14 @@ wait_ack(in, #coap_message{type = Type}, _, _) ->
wait_ack(state_timeout, wait_ack(state_timeout,
ack_timeout, ack_timeout,
_, _,
#data{cache = Msg, #transport{cache = Msg,
retry_interval = Timeout, retry_interval = Timeout,
retry_count = Count} =Data) -> retry_count = Count} =Transport) ->
case Count < ?MAX_RETRANSMIT of case Count < ?MAX_RETRANSMIT of
true -> true ->
Timeout2 = Timeout * 2, Timeout2 = Timeout * 2,
#{data => Data#data{retry_interval = Timeout2, #{transport => Transport#transport{retry_interval = Timeout2,
retry_count = Count + 1}, retry_count = Count + 1},
out => Msg, out => Msg,
timeouts => [{state_timeout, Timeout2, ack_timeout}]}; timeouts => [{state_timeout, Timeout2, ack_timeout}]};
_ -> _ ->

View File

@ -23,7 +23,7 @@
-define(MAXIMUM_MAX_AGE, 4294967295). -define(MAXIMUM_MAX_AGE, 4294967295).
-define(EMPTY_RESULT, #{}). -define(EMPTY_RESULT, #{}).
-define(TRANSFER_RESULT(R1, Keys, From, Value), -define(TRANSFER_RESULT(Keys, From, Value, R1),
begin begin
R2 = maps:with(Keys, R1), R2 = maps:with(Keys, R1),
R2#{From => Value} R2#{From => Value}