271 lines
7.2 KiB
Erlang
271 lines
7.2 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_coap_transport).
|
|
|
|
-include("emqx_coap.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-define(ACK_TIMEOUT, 2000).
|
|
-define(ACK_RANDOM_FACTOR, 1000).
|
|
-define(MAX_RETRANSMIT, 4).
|
|
-define(EXCHANGE_LIFETIME, 247000).
|
|
-define(NON_LIFETIME, 145000).
|
|
|
|
-type request_context() :: any().
|
|
|
|
-record(transport, {
|
|
cache :: undefined | coap_message(),
|
|
req_context :: request_context(),
|
|
retry_interval :: non_neg_integer(),
|
|
retry_count :: non_neg_integer(),
|
|
observe :: non_neg_integer() | undefined
|
|
}).
|
|
|
|
-type transport() :: #transport{}.
|
|
|
|
-export([
|
|
new/0, new/1,
|
|
idle/3,
|
|
maybe_reset/3,
|
|
set_cache/2,
|
|
maybe_resend_4request/3,
|
|
wait_ack/3,
|
|
until_stop/3,
|
|
observe/3,
|
|
maybe_resend_4response/3
|
|
]).
|
|
|
|
-export_type([transport/0]).
|
|
|
|
-import(emqx_coap_medium, [
|
|
empty/0,
|
|
reset/2,
|
|
proto_out/2,
|
|
out/1, out/2,
|
|
proto_out/1,
|
|
reply/2
|
|
]).
|
|
|
|
-elvis([{elvis_style, atom_naming_convention, disable}]).
|
|
-elvis([{elvis_style, no_if_expression, disable}]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% APIs
|
|
|
|
-spec new() -> transport().
|
|
new() ->
|
|
new(undefined).
|
|
|
|
new(ReqCtx) ->
|
|
#transport{
|
|
cache = undefined,
|
|
retry_interval = 0,
|
|
retry_count = 0,
|
|
req_context = ReqCtx
|
|
}.
|
|
|
|
idle(
|
|
in,
|
|
#coap_message{type = non, method = Method} = Msg,
|
|
_
|
|
) ->
|
|
case Method of
|
|
undefined ->
|
|
reset(Msg, #{next => stop});
|
|
_ ->
|
|
proto_out(
|
|
{request, Msg},
|
|
#{
|
|
next => until_stop,
|
|
timeouts =>
|
|
[{stop_timeout, ?NON_LIFETIME}]
|
|
}
|
|
)
|
|
end;
|
|
idle(
|
|
in,
|
|
#coap_message{type = con, method = Method} = Msg,
|
|
_
|
|
) ->
|
|
case Method of
|
|
undefined ->
|
|
reset(Msg, #{next => stop});
|
|
_ ->
|
|
proto_out(
|
|
{request, Msg},
|
|
#{
|
|
next => maybe_resend_4request,
|
|
timeouts => [{stop_timeout, ?EXCHANGE_LIFETIME}]
|
|
}
|
|
)
|
|
end;
|
|
idle(out, #coap_message{type = non} = Msg, _) ->
|
|
out(Msg, #{
|
|
next => maybe_reset,
|
|
timeouts => [{stop_timeout, ?NON_LIFETIME}]
|
|
});
|
|
idle(out, Msg, Transport) ->
|
|
_ = emqx_utils:rand_seed(),
|
|
Timeout = ?ACK_TIMEOUT + rand:uniform(?ACK_RANDOM_FACTOR),
|
|
out(Msg, #{
|
|
next => wait_ack,
|
|
transport => Transport#transport{cache = Msg},
|
|
timeouts => [
|
|
{state_timeout, Timeout, ack_timeout},
|
|
{stop_timeout, ?EXCHANGE_LIFETIME}
|
|
]
|
|
}).
|
|
|
|
maybe_resend_4request(in, Msg, Transport) ->
|
|
maybe_resend(Msg, true, Transport).
|
|
|
|
maybe_resend_4response(in, Msg, Transport) ->
|
|
maybe_resend(Msg, false, Transport).
|
|
|
|
maybe_resend(Msg, IsExpecteReq, #transport{cache = Cache}) ->
|
|
IsExpected = emqx_coap_message:is_request(Msg) =:= IsExpecteReq,
|
|
case IsExpected of
|
|
true ->
|
|
case Cache of
|
|
undefined ->
|
|
%% handler in processing, ignore
|
|
empty();
|
|
_ ->
|
|
out(Cache)
|
|
end;
|
|
_ ->
|
|
reset(Msg, #{next => stop})
|
|
end.
|
|
|
|
maybe_reset(
|
|
in,
|
|
#coap_message{type = Type, method = Method} = Message,
|
|
#transport{req_context = Ctx} = Transport
|
|
) ->
|
|
Ret = #{next => stop},
|
|
CtxMsg = {Ctx, Message},
|
|
if
|
|
Type =:= reset ->
|
|
proto_out({reset, CtxMsg}, Ret);
|
|
is_tuple(Method) ->
|
|
on_response(
|
|
Message,
|
|
Transport,
|
|
if
|
|
Type =:= non -> until_stop;
|
|
true -> maybe_resend_4response
|
|
end
|
|
);
|
|
true ->
|
|
reset(Message, Ret)
|
|
end.
|
|
|
|
wait_ack(in, #coap_message{type = Type, method = Method} = Msg, #transport{req_context = Ctx}) ->
|
|
CtxMsg = {Ctx, Msg},
|
|
case Type of
|
|
reset ->
|
|
proto_out({reset, CtxMsg}, #{next => stop});
|
|
_ ->
|
|
case Method of
|
|
undefined ->
|
|
%% empty ack, keep transport to recv response
|
|
proto_out({ack, CtxMsg});
|
|
{_, _} ->
|
|
%% ack with payload
|
|
proto_out({response, CtxMsg}, #{next => stop});
|
|
_ ->
|
|
reset(Msg, #{next => stop})
|
|
end
|
|
end;
|
|
wait_ack(
|
|
state_timeout,
|
|
ack_timeout,
|
|
#transport{
|
|
cache = Msg,
|
|
retry_interval = Timeout,
|
|
retry_count = Count
|
|
} = Transport
|
|
) ->
|
|
case Count < ?MAX_RETRANSMIT of
|
|
true ->
|
|
Timeout2 = Timeout * 2,
|
|
out(
|
|
Msg,
|
|
#{
|
|
transport => Transport#transport{
|
|
retry_interval = Timeout2,
|
|
retry_count = Count + 1
|
|
},
|
|
timeouts => [{state_timeout, Timeout2, ack_timeout}]
|
|
}
|
|
);
|
|
_ ->
|
|
proto_out({ack_failure, Msg}, #{next_state => stop})
|
|
end.
|
|
|
|
observe(
|
|
in,
|
|
#coap_message{method = Method} = Message,
|
|
#transport{observe = Observe} = Transport
|
|
) ->
|
|
case Method of
|
|
{ok, _} ->
|
|
case emqx_coap_message:get_option(observe, Message, Observe) of
|
|
Observe ->
|
|
%% repeatd notify, ignore
|
|
empty();
|
|
NewObserve ->
|
|
on_response(
|
|
Message,
|
|
Transport#transport{observe = NewObserve},
|
|
?FUNCTION_NAME
|
|
)
|
|
end;
|
|
{error, _} ->
|
|
#{next => stop};
|
|
_ ->
|
|
emqx_coap_message:reset(Message)
|
|
end.
|
|
|
|
until_stop(_, _, _) ->
|
|
empty().
|
|
|
|
set_cache(Cache, Transport) ->
|
|
Transport#transport{cache = Cache}.
|
|
|
|
on_response(
|
|
#coap_message{type = Type} = Message,
|
|
#transport{req_context = Ctx} = Transport,
|
|
NextState
|
|
) ->
|
|
CtxMsg = {Ctx, Message},
|
|
if
|
|
Type =:= non ->
|
|
proto_out({response, CtxMsg}, #{next => NextState});
|
|
Type =:= con ->
|
|
Ack = emqx_coap_message:ack(Message),
|
|
proto_out(
|
|
{response, CtxMsg},
|
|
out(Ack, #{
|
|
next => NextState,
|
|
transport => Transport#transport{cache = Ack}
|
|
})
|
|
);
|
|
true ->
|
|
emqx_coap_message:reset(Message)
|
|
end.
|