412 lines
12 KiB
Erlang
412 lines
12 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% the transport state machine manager
|
|
-module(emqx_coap_tm).
|
|
|
|
-export([
|
|
new/0,
|
|
handle_request/2,
|
|
handle_response/2,
|
|
handle_out/2,
|
|
handle_out/3,
|
|
set_reply/2,
|
|
timeout/2
|
|
]).
|
|
|
|
-export_type([manager/0, event_result/1]).
|
|
|
|
-include("emqx_coap.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-type direction() :: in | out.
|
|
|
|
-record(state_machine, {
|
|
seq_id :: seq_id(),
|
|
id :: state_machine_key(),
|
|
token :: token() | undefined,
|
|
observe :: 0 | 1 | undefined | observed,
|
|
state :: atom(),
|
|
timers :: map(),
|
|
transport :: emqx_coap_transport:transport()
|
|
}).
|
|
-type state_machine() :: #state_machine{}.
|
|
|
|
-type message_id() :: 0..?MAX_MESSAGE_ID.
|
|
-type token_key() :: {token, token()}.
|
|
-type state_machine_key() :: {direction(), message_id()}.
|
|
-type seq_id() :: pos_integer().
|
|
-type manager_key() :: token_key() | state_machine_key() | seq_id().
|
|
|
|
-type manager() :: #{
|
|
seq_id => seq_id(),
|
|
next_msg_id => coap_message_id(),
|
|
token_key() => seq_id(),
|
|
state_machine_key() => seq_id(),
|
|
seq_id() => state_machine()
|
|
}.
|
|
|
|
-type ttimeout() ::
|
|
{state_timeout, pos_integer(), any()}
|
|
| {stop_timeout, pos_integer()}.
|
|
|
|
-type topic() :: binary().
|
|
-type token() :: binary().
|
|
-type sub_register() :: {topic(), token()} | topic().
|
|
|
|
-type event_result(State) ::
|
|
#{
|
|
next => State,
|
|
outgoing => coap_message(),
|
|
timeouts => list(ttimeout()),
|
|
has_sub => undefined | sub_register(),
|
|
transport => emqx_coap_transport:transport()
|
|
}.
|
|
|
|
-define(TOKEN_ID(T), {token, T}).
|
|
|
|
-import(emqx_coap_medium, [empty/0, iter/4, reset/1, proto_out/2]).
|
|
|
|
-elvis([{elvis_style, no_if_expression, disable}]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% API
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec new() -> manager().
|
|
new() ->
|
|
#{
|
|
seq_id => 1,
|
|
next_msg_id => rand:uniform(?MAX_MESSAGE_ID)
|
|
}.
|
|
|
|
handle_request(#coap_message{id = MsgId} = Msg, TM) ->
|
|
Id = {in, MsgId},
|
|
case find_machine(Id, TM) of
|
|
undefined ->
|
|
{Machine, TM2} = new_in_machine(Id, TM),
|
|
process_event(in, Msg, TM2, Machine);
|
|
Machine ->
|
|
process_event(in, Msg, TM, Machine)
|
|
end.
|
|
|
|
%% client response
|
|
handle_response(#coap_message{type = Type, id = MsgId, token = Token} = Msg, TM) ->
|
|
Id = {out, MsgId},
|
|
TokenId = ?TOKEN_ID(Token),
|
|
case find_machine_by_keys([Id, TokenId], TM) of
|
|
undefined ->
|
|
case Type of
|
|
reset ->
|
|
empty();
|
|
_ ->
|
|
reset(Msg)
|
|
end;
|
|
Machine ->
|
|
process_event(in, Msg, TM, Machine)
|
|
end.
|
|
|
|
%% send to a client, msg can be request/piggyback/separate/notify
|
|
handle_out({Ctx, Msg}, TM) ->
|
|
handle_out(Msg, Ctx, TM);
|
|
handle_out(Msg, TM) ->
|
|
handle_out(Msg, undefined, TM).
|
|
|
|
handle_out(#coap_message{token = Token} = MsgT, Ctx, TM) ->
|
|
{MsgId, TM2} = alloc_message_id(TM),
|
|
Msg = MsgT#coap_message{id = MsgId},
|
|
Id = {out, MsgId},
|
|
TokenId = ?TOKEN_ID(Token),
|
|
%% TODO why find by token ?
|
|
case find_machine_by_keys([Id, TokenId], TM2) of
|
|
undefined ->
|
|
{Machine, TM3} = new_out_machine(Id, Ctx, Msg, TM2),
|
|
process_event(out, Msg, TM3, Machine);
|
|
_ ->
|
|
%% ignore repeat send
|
|
empty()
|
|
end.
|
|
|
|
set_reply(#coap_message{id = MsgId} = Msg, TM) ->
|
|
Id = {in, MsgId},
|
|
case find_machine(Id, TM) of
|
|
undefined ->
|
|
TM;
|
|
#state_machine{
|
|
transport = Transport,
|
|
seq_id = SeqId
|
|
} = Machine ->
|
|
Transport2 = emqx_coap_transport:set_cache(Msg, Transport),
|
|
Machine2 = Machine#state_machine{transport = Transport2},
|
|
TM#{SeqId => Machine2}
|
|
end.
|
|
|
|
timeout({SeqId, Type, Msg}, TM) ->
|
|
case maps:get(SeqId, TM, undefined) of
|
|
undefined ->
|
|
empty();
|
|
#state_machine{timers = Timers} = Machine ->
|
|
%% maybe timer has been canceled
|
|
case maps:is_key(Type, Timers) of
|
|
true ->
|
|
process_event(Type, Msg, TM, Machine);
|
|
_ ->
|
|
empty()
|
|
end
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
process_event(stop_timeout, _, TM, Machine) ->
|
|
process_manager(stop, #{}, Machine, TM);
|
|
process_event(
|
|
Event,
|
|
Msg,
|
|
TM,
|
|
#state_machine{
|
|
state = State,
|
|
transport = Transport
|
|
} = Machine
|
|
) ->
|
|
Result = emqx_coap_transport:State(Event, Msg, Transport),
|
|
iter(
|
|
[
|
|
proto,
|
|
fun process_observe_response/5,
|
|
next,
|
|
fun process_state_change/5,
|
|
transport,
|
|
fun process_transport_change/5,
|
|
timeouts,
|
|
fun process_timeouts/5,
|
|
fun process_manager/4
|
|
],
|
|
Result,
|
|
Machine,
|
|
TM
|
|
).
|
|
|
|
process_observe_response(
|
|
{response, {_, Msg}} = Response,
|
|
Result,
|
|
#state_machine{observe = 0} = Machine,
|
|
TM,
|
|
Iter
|
|
) ->
|
|
Result2 = proto_out(Response, Result),
|
|
case Msg#coap_message.method of
|
|
{ok, _} ->
|
|
iter(
|
|
Iter,
|
|
Result2#{next => observe},
|
|
Machine#state_machine{observe = observed},
|
|
TM
|
|
);
|
|
_ ->
|
|
iter(Iter, Result2, Machine, TM)
|
|
end;
|
|
process_observe_response(Proto, Result, Machine, TM, Iter) ->
|
|
iter(Iter, proto_out(Proto, Result), Machine, TM).
|
|
|
|
process_state_change(Next, Result, Machine, TM, Iter) ->
|
|
case Next of
|
|
stop ->
|
|
process_manager(stop, Result, Machine, TM);
|
|
_ ->
|
|
iter(
|
|
Iter,
|
|
Result,
|
|
cancel_state_timer(Machine#state_machine{state = Next}),
|
|
TM
|
|
)
|
|
end.
|
|
|
|
process_transport_change(Transport, Result, Machine, TM, Iter) ->
|
|
iter(Iter, Result, Machine#state_machine{transport = Transport}, TM).
|
|
|
|
process_timeouts([], Result, Machine, TM, Iter) ->
|
|
iter(Iter, Result, Machine, TM);
|
|
process_timeouts(
|
|
Timeouts,
|
|
Result,
|
|
#state_machine{
|
|
seq_id = SeqId,
|
|
timers = Timers
|
|
} = Machine,
|
|
TM,
|
|
Iter
|
|
) ->
|
|
NewTimers = lists:foldl(
|
|
fun
|
|
({state_timeout, _, _} = Timer, Acc) ->
|
|
process_timer(SeqId, Timer, Acc);
|
|
({stop_timeout, I}, Acc) ->
|
|
process_timer(SeqId, {stop_timeout, I, stop}, Acc)
|
|
end,
|
|
Timers,
|
|
Timeouts
|
|
),
|
|
iter(Iter, Result, Machine#state_machine{timers = NewTimers}, TM).
|
|
|
|
process_manager(stop, Result, #state_machine{seq_id = SeqId}, TM) ->
|
|
Result#{tm => delete_machine(SeqId, TM)};
|
|
process_manager(_, Result, #state_machine{seq_id = SeqId} = Machine2, TM) ->
|
|
Result#{tm => TM#{SeqId => Machine2}}.
|
|
|
|
cancel_state_timer(#state_machine{timers = Timers} = Machine) ->
|
|
case maps:get(state_timer, Timers, undefined) of
|
|
undefined ->
|
|
Machine;
|
|
Ref ->
|
|
_ = emqx_utils:cancel_timer(Ref),
|
|
Machine#state_machine{timers = maps:remove(state_timer, Timers)}
|
|
end.
|
|
|
|
process_timer(SeqId, {Type, Interval, Msg}, Timers) ->
|
|
Ref = emqx_utils:start_timer(Interval, {state_machine, {SeqId, Type, Msg}}),
|
|
Timers#{Type => Ref}.
|
|
|
|
-spec delete_machine(manager_key(), manager()) -> manager().
|
|
delete_machine(Id, Manager) ->
|
|
case find_machine(Id, Manager) of
|
|
undefined ->
|
|
Manager;
|
|
#state_machine{
|
|
seq_id = SeqId,
|
|
id = MachineId,
|
|
token = Token,
|
|
timers = Timers
|
|
} ->
|
|
lists:foreach(
|
|
fun({_, Ref}) ->
|
|
emqx_utils:cancel_timer(Ref)
|
|
end,
|
|
maps:to_list(Timers)
|
|
),
|
|
maps:without([SeqId, MachineId, ?TOKEN_ID(Token)], Manager)
|
|
end.
|
|
|
|
-spec find_machine(manager_key(), manager()) -> state_machine() | undefined.
|
|
find_machine({_, _} = Id, Manager) ->
|
|
find_machine_by_seqid(maps:get(Id, Manager, undefined), Manager);
|
|
find_machine(SeqId, Manager) ->
|
|
find_machine_by_seqid(SeqId, Manager).
|
|
|
|
-spec find_machine_by_seqid(seq_id() | undefined, manager()) ->
|
|
state_machine() | undefined.
|
|
find_machine_by_seqid(SeqId, Manager) ->
|
|
maps:get(SeqId, Manager, undefined).
|
|
|
|
-spec find_machine_by_keys(
|
|
list(manager_key()),
|
|
manager()
|
|
) -> state_machine() | undefined.
|
|
find_machine_by_keys([H | T], Manager) ->
|
|
case H of
|
|
?TOKEN_ID(<<>>) ->
|
|
find_machine_by_keys(T, Manager);
|
|
_ ->
|
|
case find_machine(H, Manager) of
|
|
undefined ->
|
|
find_machine_by_keys(T, Manager);
|
|
Machine ->
|
|
Machine
|
|
end
|
|
end;
|
|
find_machine_by_keys(_, _) ->
|
|
undefined.
|
|
|
|
-spec new_in_machine(state_machine_key(), manager()) ->
|
|
{state_machine(), manager()}.
|
|
new_in_machine(MachineId, #{seq_id := SeqId} = Manager) ->
|
|
Machine = #state_machine{
|
|
seq_id = SeqId,
|
|
id = MachineId,
|
|
state = idle,
|
|
timers = #{},
|
|
transport = emqx_coap_transport:new()
|
|
},
|
|
{Machine, Manager#{
|
|
seq_id := SeqId + 1,
|
|
SeqId => Machine,
|
|
MachineId => SeqId
|
|
}}.
|
|
|
|
-spec new_out_machine(state_machine_key(), any(), coap_message(), manager()) ->
|
|
{state_machine(), manager()}.
|
|
new_out_machine(
|
|
MachineId,
|
|
Ctx,
|
|
#coap_message{type = Type, token = Token, options = Opts},
|
|
#{seq_id := SeqId} = Manager
|
|
) ->
|
|
Observe = maps:get(observe, Opts, undefined),
|
|
Machine = #state_machine{
|
|
seq_id = SeqId,
|
|
id = MachineId,
|
|
token = Token,
|
|
observe = Observe,
|
|
state = idle,
|
|
timers = #{},
|
|
transport = emqx_coap_transport:new(Ctx)
|
|
},
|
|
|
|
Manager2 = Manager#{
|
|
seq_id := SeqId + 1,
|
|
SeqId => Machine,
|
|
MachineId => SeqId
|
|
},
|
|
{Machine,
|
|
if
|
|
Token =:= <<>> ->
|
|
Manager2;
|
|
Observe =:= 1 ->
|
|
TokenId = ?TOKEN_ID(Token),
|
|
delete_machine(TokenId, Manager2);
|
|
Type =:= con orelse Observe =:= 0 ->
|
|
TokenId = ?TOKEN_ID(Token),
|
|
case maps:get(TokenId, Manager, undefined) of
|
|
undefined ->
|
|
Manager2#{TokenId => SeqId};
|
|
_ ->
|
|
throw("token conflict")
|
|
end;
|
|
true ->
|
|
Manager2
|
|
end}.
|
|
|
|
alloc_message_id(#{next_msg_id := MsgId} = TM) ->
|
|
alloc_message_id(MsgId, TM).
|
|
|
|
alloc_message_id(MsgId, TM) ->
|
|
Id = {out, MsgId},
|
|
case maps:get(Id, TM, undefined) of
|
|
undefined ->
|
|
{MsgId, TM#{next_msg_id => next_message_id(MsgId)}};
|
|
_ ->
|
|
alloc_message_id(next_message_id(MsgId), TM)
|
|
end.
|
|
|
|
next_message_id(MsgId) ->
|
|
Next = MsgId + 1,
|
|
case Next >= ?MAX_MESSAGE_ID of
|
|
true ->
|
|
1;
|
|
false ->
|
|
Next
|
|
end.
|