emqx/apps/emqx_gateway_coap/src/emqx_coap_tm.erl

412 lines
12 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% the transport state machine manager
-module(emqx_coap_tm).
-export([
new/0,
handle_request/2,
handle_response/2,
handle_out/2,
handle_out/3,
set_reply/2,
timeout/2
]).
-export_type([manager/0, event_result/1]).
-include("emqx_coap.hrl").
-include_lib("emqx/include/logger.hrl").
-type direction() :: in | out.
-record(state_machine, {
seq_id :: seq_id(),
id :: state_machine_key(),
token :: token() | undefined,
observe :: 0 | 1 | undefined | observed,
state :: atom(),
timers :: map(),
transport :: emqx_coap_transport:transport()
}).
-type state_machine() :: #state_machine{}.
-type message_id() :: 0..?MAX_MESSAGE_ID.
-type token_key() :: {token, token()}.
-type state_machine_key() :: {direction(), message_id()}.
-type seq_id() :: pos_integer().
-type manager_key() :: token_key() | state_machine_key() | seq_id().
-type manager() :: #{
seq_id => seq_id(),
next_msg_id => coap_message_id(),
token_key() => seq_id(),
state_machine_key() => seq_id(),
seq_id() => state_machine()
}.
-type ttimeout() ::
{state_timeout, pos_integer(), any()}
| {stop_timeout, pos_integer()}.
-type topic() :: binary().
-type token() :: binary().
-type sub_register() :: {topic(), token()} | topic().
-type event_result(State) ::
#{
next => State,
outgoing => coap_message(),
timeouts => list(ttimeout()),
has_sub => undefined | sub_register(),
transport => emqx_coap_transport:transport()
}.
-define(TOKEN_ID(T), {token, T}).
-import(emqx_coap_medium, [empty/0, iter/4, reset/1, proto_out/2]).
-elvis([{elvis_style, no_if_expression, disable}]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec new() -> manager().
new() ->
#{
seq_id => 1,
next_msg_id => rand:uniform(?MAX_MESSAGE_ID)
}.
handle_request(#coap_message{id = MsgId} = Msg, TM) ->
Id = {in, MsgId},
case find_machine(Id, TM) of
undefined ->
{Machine, TM2} = new_in_machine(Id, TM),
process_event(in, Msg, TM2, Machine);
Machine ->
process_event(in, Msg, TM, Machine)
end.
%% client response
handle_response(#coap_message{type = Type, id = MsgId, token = Token} = Msg, TM) ->
Id = {out, MsgId},
TokenId = ?TOKEN_ID(Token),
case find_machine_by_keys([Id, TokenId], TM) of
undefined ->
case Type of
reset ->
empty();
_ ->
reset(Msg)
end;
Machine ->
process_event(in, Msg, TM, Machine)
end.
%% send to a client, msg can be request/piggyback/separate/notify
handle_out({Ctx, Msg}, TM) ->
handle_out(Msg, Ctx, TM);
handle_out(Msg, TM) ->
handle_out(Msg, undefined, TM).
handle_out(#coap_message{token = Token} = MsgT, Ctx, TM) ->
{MsgId, TM2} = alloc_message_id(TM),
Msg = MsgT#coap_message{id = MsgId},
Id = {out, MsgId},
TokenId = ?TOKEN_ID(Token),
%% TODO why find by token ?
case find_machine_by_keys([Id, TokenId], TM2) of
undefined ->
{Machine, TM3} = new_out_machine(Id, Ctx, Msg, TM2),
process_event(out, Msg, TM3, Machine);
_ ->
%% ignore repeat send
empty()
end.
set_reply(#coap_message{id = MsgId} = Msg, TM) ->
Id = {in, MsgId},
case find_machine(Id, TM) of
undefined ->
TM;
#state_machine{
transport = Transport,
seq_id = SeqId
} = Machine ->
Transport2 = emqx_coap_transport:set_cache(Msg, Transport),
Machine2 = Machine#state_machine{transport = Transport2},
TM#{SeqId => Machine2}
end.
timeout({SeqId, Type, Msg}, TM) ->
case maps:get(SeqId, TM, undefined) of
undefined ->
empty();
#state_machine{timers = Timers} = Machine ->
%% maybe timer has been canceled
case maps:is_key(Type, Timers) of
true ->
process_event(Type, Msg, TM, Machine);
_ ->
empty()
end
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
process_event(stop_timeout, _, TM, Machine) ->
process_manager(stop, #{}, Machine, TM);
process_event(
Event,
Msg,
TM,
#state_machine{
state = State,
transport = Transport
} = Machine
) ->
Result = emqx_coap_transport:State(Event, Msg, Transport),
iter(
[
proto,
fun process_observe_response/5,
next,
fun process_state_change/5,
transport,
fun process_transport_change/5,
timeouts,
fun process_timeouts/5,
fun process_manager/4
],
Result,
Machine,
TM
).
process_observe_response(
{response, {_, Msg}} = Response,
Result,
#state_machine{observe = 0} = Machine,
TM,
Iter
) ->
Result2 = proto_out(Response, Result),
case Msg#coap_message.method of
{ok, _} ->
iter(
Iter,
Result2#{next => observe},
Machine#state_machine{observe = observed},
TM
);
_ ->
iter(Iter, Result2, Machine, TM)
end;
process_observe_response(Proto, Result, Machine, TM, Iter) ->
iter(Iter, proto_out(Proto, Result), Machine, TM).
process_state_change(Next, Result, Machine, TM, Iter) ->
case Next of
stop ->
process_manager(stop, Result, Machine, TM);
_ ->
iter(
Iter,
Result,
cancel_state_timer(Machine#state_machine{state = Next}),
TM
)
end.
process_transport_change(Transport, Result, Machine, TM, Iter) ->
iter(Iter, Result, Machine#state_machine{transport = Transport}, TM).
process_timeouts([], Result, Machine, TM, Iter) ->
iter(Iter, Result, Machine, TM);
process_timeouts(
Timeouts,
Result,
#state_machine{
seq_id = SeqId,
timers = Timers
} = Machine,
TM,
Iter
) ->
NewTimers = lists:foldl(
fun
({state_timeout, _, _} = Timer, Acc) ->
process_timer(SeqId, Timer, Acc);
({stop_timeout, I}, Acc) ->
process_timer(SeqId, {stop_timeout, I, stop}, Acc)
end,
Timers,
Timeouts
),
iter(Iter, Result, Machine#state_machine{timers = NewTimers}, TM).
process_manager(stop, Result, #state_machine{seq_id = SeqId}, TM) ->
Result#{tm => delete_machine(SeqId, TM)};
process_manager(_, Result, #state_machine{seq_id = SeqId} = Machine2, TM) ->
Result#{tm => TM#{SeqId => Machine2}}.
cancel_state_timer(#state_machine{timers = Timers} = Machine) ->
case maps:get(state_timer, Timers, undefined) of
undefined ->
Machine;
Ref ->
_ = emqx_utils:cancel_timer(Ref),
Machine#state_machine{timers = maps:remove(state_timer, Timers)}
end.
process_timer(SeqId, {Type, Interval, Msg}, Timers) ->
Ref = emqx_utils:start_timer(Interval, {state_machine, {SeqId, Type, Msg}}),
Timers#{Type => Ref}.
-spec delete_machine(manager_key(), manager()) -> manager().
delete_machine(Id, Manager) ->
case find_machine(Id, Manager) of
undefined ->
Manager;
#state_machine{
seq_id = SeqId,
id = MachineId,
token = Token,
timers = Timers
} ->
lists:foreach(
fun({_, Ref}) ->
emqx_utils:cancel_timer(Ref)
end,
maps:to_list(Timers)
),
maps:without([SeqId, MachineId, ?TOKEN_ID(Token)], Manager)
end.
-spec find_machine(manager_key(), manager()) -> state_machine() | undefined.
find_machine({_, _} = Id, Manager) ->
find_machine_by_seqid(maps:get(Id, Manager, undefined), Manager);
find_machine(SeqId, Manager) ->
find_machine_by_seqid(SeqId, Manager).
-spec find_machine_by_seqid(seq_id() | undefined, manager()) ->
state_machine() | undefined.
find_machine_by_seqid(SeqId, Manager) ->
maps:get(SeqId, Manager, undefined).
-spec find_machine_by_keys(
list(manager_key()),
manager()
) -> state_machine() | undefined.
find_machine_by_keys([H | T], Manager) ->
case H of
?TOKEN_ID(<<>>) ->
find_machine_by_keys(T, Manager);
_ ->
case find_machine(H, Manager) of
undefined ->
find_machine_by_keys(T, Manager);
Machine ->
Machine
end
end;
find_machine_by_keys(_, _) ->
undefined.
-spec new_in_machine(state_machine_key(), manager()) ->
{state_machine(), manager()}.
new_in_machine(MachineId, #{seq_id := SeqId} = Manager) ->
Machine = #state_machine{
seq_id = SeqId,
id = MachineId,
state = idle,
timers = #{},
transport = emqx_coap_transport:new()
},
{Machine, Manager#{
seq_id := SeqId + 1,
SeqId => Machine,
MachineId => SeqId
}}.
-spec new_out_machine(state_machine_key(), any(), coap_message(), manager()) ->
{state_machine(), manager()}.
new_out_machine(
MachineId,
Ctx,
#coap_message{type = Type, token = Token, options = Opts},
#{seq_id := SeqId} = Manager
) ->
Observe = maps:get(observe, Opts, undefined),
Machine = #state_machine{
seq_id = SeqId,
id = MachineId,
token = Token,
observe = Observe,
state = idle,
timers = #{},
transport = emqx_coap_transport:new(Ctx)
},
Manager2 = Manager#{
seq_id := SeqId + 1,
SeqId => Machine,
MachineId => SeqId
},
{Machine,
if
Token =:= <<>> ->
Manager2;
Observe =:= 1 ->
TokenId = ?TOKEN_ID(Token),
delete_machine(TokenId, Manager2);
Type =:= con orelse Observe =:= 0 ->
TokenId = ?TOKEN_ID(Token),
case maps:get(TokenId, Manager, undefined) of
undefined ->
Manager2#{TokenId => SeqId};
_ ->
throw("token conflict")
end;
true ->
Manager2
end}.
alloc_message_id(#{next_msg_id := MsgId} = TM) ->
alloc_message_id(MsgId, TM).
alloc_message_id(MsgId, TM) ->
Id = {out, MsgId},
case maps:get(Id, TM, undefined) of
undefined ->
{MsgId, TM#{next_msg_id => next_message_id(MsgId)}};
_ ->
alloc_message_id(next_message_id(MsgId), TM)
end.
next_message_id(MsgId) ->
Next = MsgId + 1,
case Next >= ?MAX_MESSAGE_ID of
true ->
1;
false ->
Next
end.