emqx/apps/emqx_gateway_coap/src/emqx_coap_transport.erl

271 lines
7.2 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_transport).
-include("emqx_coap.hrl").
-include_lib("emqx/include/logger.hrl").
-define(ACK_TIMEOUT, 2000).
-define(ACK_RANDOM_FACTOR, 1000).
-define(MAX_RETRANSMIT, 4).
-define(EXCHANGE_LIFETIME, 247000).
-define(NON_LIFETIME, 145000).
-type request_context() :: any().
-record(transport, {
cache :: undefined | coap_message(),
req_context :: request_context(),
retry_interval :: non_neg_integer(),
retry_count :: non_neg_integer(),
observe :: non_neg_integer() | undefined
}).
-type transport() :: #transport{}.
-export([
new/0, new/1,
idle/3,
maybe_reset/3,
set_cache/2,
maybe_resend_4request/3,
wait_ack/3,
until_stop/3,
observe/3,
maybe_resend_4response/3
]).
-export_type([transport/0]).
-import(emqx_coap_medium, [
empty/0,
reset/2,
proto_out/2,
out/1, out/2,
proto_out/1,
reply/2
]).
-elvis([{elvis_style, atom_naming_convention, disable}]).
-elvis([{elvis_style, no_if_expression, disable}]).
%%--------------------------------------------------------------------
%% APIs
-spec new() -> transport().
new() ->
new(undefined).
new(ReqCtx) ->
#transport{
cache = undefined,
retry_interval = 0,
retry_count = 0,
req_context = ReqCtx
}.
idle(
in,
#coap_message{type = non, method = Method} = Msg,
_
) ->
case Method of
undefined ->
reset(Msg, #{next => stop});
_ ->
proto_out(
{request, Msg},
#{
next => until_stop,
timeouts =>
[{stop_timeout, ?NON_LIFETIME}]
}
)
end;
idle(
in,
#coap_message{type = con, method = Method} = Msg,
_
) ->
case Method of
undefined ->
reset(Msg, #{next => stop});
_ ->
proto_out(
{request, Msg},
#{
next => maybe_resend_4request,
timeouts => [{stop_timeout, ?EXCHANGE_LIFETIME}]
}
)
end;
idle(out, #coap_message{type = non} = Msg, _) ->
out(Msg, #{
next => maybe_reset,
timeouts => [{stop_timeout, ?NON_LIFETIME}]
});
idle(out, Msg, Transport) ->
_ = emqx_utils:rand_seed(),
Timeout = ?ACK_TIMEOUT + rand:uniform(?ACK_RANDOM_FACTOR),
out(Msg, #{
next => wait_ack,
transport => Transport#transport{cache = Msg},
timeouts => [
{state_timeout, Timeout, ack_timeout},
{stop_timeout, ?EXCHANGE_LIFETIME}
]
}).
maybe_resend_4request(in, Msg, Transport) ->
maybe_resend(Msg, true, Transport).
maybe_resend_4response(in, Msg, Transport) ->
maybe_resend(Msg, false, Transport).
maybe_resend(Msg, IsExpecteReq, #transport{cache = Cache}) ->
IsExpected = emqx_coap_message:is_request(Msg) =:= IsExpecteReq,
case IsExpected of
true ->
case Cache of
undefined ->
%% handler in processing, ignore
empty();
_ ->
out(Cache)
end;
_ ->
reset(Msg, #{next => stop})
end.
maybe_reset(
in,
#coap_message{type = Type, method = Method} = Message,
#transport{req_context = Ctx} = Transport
) ->
Ret = #{next => stop},
CtxMsg = {Ctx, Message},
if
Type =:= reset ->
proto_out({reset, CtxMsg}, Ret);
is_tuple(Method) ->
on_response(
Message,
Transport,
if
Type =:= non -> until_stop;
true -> maybe_resend_4response
end
);
true ->
reset(Message, Ret)
end.
wait_ack(in, #coap_message{type = Type, method = Method} = Msg, #transport{req_context = Ctx}) ->
CtxMsg = {Ctx, Msg},
case Type of
reset ->
proto_out({reset, CtxMsg}, #{next => stop});
_ ->
case Method of
undefined ->
%% empty ack, keep transport to recv response
proto_out({ack, CtxMsg});
{_, _} ->
%% ack with payload
proto_out({response, CtxMsg}, #{next => stop});
_ ->
reset(Msg, #{next => stop})
end
end;
wait_ack(
state_timeout,
ack_timeout,
#transport{
cache = Msg,
retry_interval = Timeout,
retry_count = Count
} = Transport
) ->
case Count < ?MAX_RETRANSMIT of
true ->
Timeout2 = Timeout * 2,
out(
Msg,
#{
transport => Transport#transport{
retry_interval = Timeout2,
retry_count = Count + 1
},
timeouts => [{state_timeout, Timeout2, ack_timeout}]
}
);
_ ->
proto_out({ack_failure, Msg}, #{next_state => stop})
end.
observe(
in,
#coap_message{method = Method} = Message,
#transport{observe = Observe} = Transport
) ->
case Method of
{ok, _} ->
case emqx_coap_message:get_option(observe, Message, Observe) of
Observe ->
%% repeatd notify, ignore
empty();
NewObserve ->
on_response(
Message,
Transport#transport{observe = NewObserve},
?FUNCTION_NAME
)
end;
{error, _} ->
#{next => stop};
_ ->
emqx_coap_message:reset(Message)
end.
until_stop(_, _, _) ->
empty().
set_cache(Cache, Transport) ->
Transport#transport{cache = Cache}.
on_response(
#coap_message{type = Type} = Message,
#transport{req_context = Ctx} = Transport,
NextState
) ->
CtxMsg = {Ctx, Message},
if
Type =:= non ->
proto_out({response, CtxMsg}, #{next => NextState});
Type =:= con ->
Ack = emqx_coap_message:ack(Message),
proto_out(
{response, CtxMsg},
out(Ack, #{
next => NextState,
transport => Transport#transport{cache = Ack}
})
);
true ->
emqx_coap_message:reset(Message)
end.