chore(gw): improve some type specs
This commit is contained in:
parent
8be2aaf72c
commit
a2823f2ad6
|
@ -25,7 +25,8 @@
|
||||||
, validator/4
|
, validator/4
|
||||||
, metrics_inc/2
|
, metrics_inc/2
|
||||||
, run_hooks/3
|
, run_hooks/3
|
||||||
, send_request/2]).
|
, send_request/2
|
||||||
|
]).
|
||||||
|
|
||||||
-export([ init/2
|
-export([ init/2
|
||||||
, handle_in/2
|
, handle_in/2
|
||||||
|
@ -60,28 +61,43 @@
|
||||||
keepalive :: emqx_keepalive:keepalive() | undefined,
|
keepalive :: emqx_keepalive:keepalive() | undefined,
|
||||||
%% Timer
|
%% Timer
|
||||||
timers :: #{atom() => disable | undefined | reference()},
|
timers :: #{atom() => disable | undefined | reference()},
|
||||||
|
%% Connection mode
|
||||||
connection_required :: boolean(),
|
connection_required :: boolean(),
|
||||||
|
%% Connection State
|
||||||
conn_state :: idle | connected | disconnected,
|
conn_state :: idle | connected | disconnected,
|
||||||
|
%% Session token to identity this connection
|
||||||
token :: binary() | undefined
|
token :: binary() | undefined
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type channel() :: #channel{}.
|
-type channel() :: #channel{}.
|
||||||
|
|
||||||
|
-type conn_state() :: idle | connecting | connected | disconnected.
|
||||||
|
|
||||||
|
-type reply() :: {outgoing, coap_message()}
|
||||||
|
| {outgoing, [coap_message()]}
|
||||||
|
| {event, conn_state()|updated}
|
||||||
|
| {close, Reason :: atom()}.
|
||||||
|
|
||||||
|
-type replies() :: reply() | [reply()].
|
||||||
|
|
||||||
-define(TOKEN_MAXIMUM, 4294967295).
|
-define(TOKEN_MAXIMUM, 4294967295).
|
||||||
|
|
||||||
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
|
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
|
||||||
|
|
||||||
-define(DEF_IDLE_TIME, timer:seconds(30)).
|
-define(DEF_IDLE_TIME, timer:seconds(30)).
|
||||||
-define(GET_IDLE_TIME(Cfg), maps:get(idle_timeout, Cfg, ?DEF_IDLE_TIME)).
|
-define(GET_IDLE_TIME(Cfg), maps:get(idle_timeout, Cfg, ?DEF_IDLE_TIME)).
|
||||||
|
|
||||||
-import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
|
-import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec info(channel()) -> emqx_types:infos().
|
||||||
info(Channel) ->
|
info(Channel) ->
|
||||||
maps:from_list(info(?INFO_KEYS, Channel)).
|
maps:from_list(info(?INFO_KEYS, Channel)).
|
||||||
|
|
||||||
|
-spec info(list(atom())|atom(), channel()) -> term().
|
||||||
info(Keys, Channel) when is_list(Keys) ->
|
info(Keys, Channel) when is_list(Keys) ->
|
||||||
[{Key, info(Key, Channel)} || Key <- Keys];
|
[{Key, info(Key, Channel)} || Key <- Keys];
|
||||||
|
|
||||||
|
@ -92,15 +108,17 @@ info(conn_state, #channel{conn_state = CState}) ->
|
||||||
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
|
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
|
||||||
ClientInfo;
|
ClientInfo;
|
||||||
info(session, #channel{session = Session}) ->
|
info(session, #channel{session = Session}) ->
|
||||||
emqx_misc:maybe_apply(fun emqx_session:info/1, Session);
|
emqx_misc:maybe_apply(fun emqx_coap_session:info/1, Session);
|
||||||
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
|
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
|
||||||
ClientId;
|
ClientId;
|
||||||
info(ctx, #channel{ctx = Ctx}) ->
|
info(ctx, #channel{ctx = Ctx}) ->
|
||||||
Ctx.
|
Ctx.
|
||||||
|
|
||||||
|
-spec stats(channel()) -> emqx_types:stats().
|
||||||
stats(_) ->
|
stats(_) ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
|
-spec init(map(), map()) -> channel().
|
||||||
init(ConnInfoT = #{peername := {PeerHost, _},
|
init(ConnInfoT = #{peername := {PeerHost, _},
|
||||||
sockname := {_, SockPort}},
|
sockname := {_, SockPort}},
|
||||||
#{ctx := Ctx} = Config) ->
|
#{ctx := Ctx} = Config) ->
|
||||||
|
@ -126,8 +144,8 @@ init(ConnInfoT = #{peername := {PeerHost, _},
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
|
||||||
%% because it is possible to disconnect after init, and then trigger the $event.disconnected hook
|
%% because it is possible to disconnect after init, and then trigger the
|
||||||
%% and these two fields are required in the hook
|
%% $event.disconnected hook and these two fields are required in the hook
|
||||||
ConnInfo = ConnInfoT#{proto_name => <<"CoAP">>, proto_ver => <<"1">>},
|
ConnInfo = ConnInfoT#{proto_name => <<"CoAP">>, proto_ver => <<"1">>},
|
||||||
|
|
||||||
Heartbeat = ?GET_IDLE_TIME(Config),
|
Heartbeat = ?GET_IDLE_TIME(Config),
|
||||||
|
@ -144,13 +162,19 @@ init(ConnInfoT = #{peername := {PeerHost, _},
|
||||||
validator(Type, Topic, Ctx, ClientInfo) ->
|
validator(Type, Topic, Ctx, ClientInfo) ->
|
||||||
emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic).
|
emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic).
|
||||||
|
|
||||||
-spec send_request(pid(), emqx_coap_message()) -> any().
|
-spec send_request(pid(), coap_message()) -> any().
|
||||||
send_request(Channel, Request) ->
|
send_request(Channel, Request) ->
|
||||||
gen_server:send_request(Channel, {?FUNCTION_NAME, Request}).
|
gen_server:send_request(Channel, {?FUNCTION_NAME, Request}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle incoming packet
|
%% Handle incoming packet
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec handle_in(coap_message() | {frame_error, any()}, channel())
|
||||||
|
-> {ok, channel()}
|
||||||
|
| {ok, replies(), channel()}
|
||||||
|
| {shutdown, Reason :: term(), channel()}
|
||||||
|
| {shutdown, Reason :: term(), replies(), channel()}.
|
||||||
handle_in(Msg, ChannleT) ->
|
handle_in(Msg, ChannleT) ->
|
||||||
Channel = ensure_keepalive_timer(ChannleT),
|
Channel = ensure_keepalive_timer(ChannleT),
|
||||||
case emqx_coap_message:is_request(Msg) of
|
case emqx_coap_message:is_request(Msg) of
|
||||||
|
@ -170,6 +194,7 @@ handle_deliver(Delivers, #channel{session = Session,
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle timeout
|
%% Handle timeout
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel) ->
|
handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel) ->
|
||||||
case emqx_keepalive:check(NewVal, KeepAlive) of
|
case emqx_keepalive:check(NewVal, KeepAlive) of
|
||||||
{ok, NewKeepAlive} ->
|
{ok, NewKeepAlive} ->
|
||||||
|
@ -191,10 +216,28 @@ handle_timeout(_, _, Channel) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle call
|
%% Handle call
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec(handle_call(Req :: term(), From :: term(), channel())
|
||||||
|
-> {reply, Reply :: term(), channel()}
|
||||||
|
| {reply, Reply :: term(), replies(), channel()}
|
||||||
|
| {shutdown, Reason :: term(), Reply :: term(), channel()}
|
||||||
|
| {shutdown, Reason :: term(), Reply :: term(), coap_message(), channel()}).
|
||||||
handle_call({send_request, Msg}, From, Channel) ->
|
handle_call({send_request, Msg}, From, Channel) ->
|
||||||
Result = call_session(handle_out, {{send_request, From}, Msg}, Channel),
|
Result = call_session(handle_out, {{send_request, From}, Msg}, Channel),
|
||||||
erlang:setelement(1, Result, noreply);
|
erlang:setelement(1, Result, noreply);
|
||||||
|
|
||||||
|
handle_call({subscribe, _Topic, _SubOpts}, _From, Channel) ->
|
||||||
|
{reply, {error, nosupport}, Channel};
|
||||||
|
|
||||||
|
handle_call({unsubscribe, _Topic}, _From, Channel) ->
|
||||||
|
{reply, {error, noimpl}, Channel};
|
||||||
|
|
||||||
|
handle_call(subscriptions, _From, Channel) ->
|
||||||
|
{reply, {error, noimpl}, Channel};
|
||||||
|
|
||||||
|
handle_call(kick, _From, Channel) ->
|
||||||
|
{reply, {error, noimpl}, Channel};
|
||||||
|
|
||||||
handle_call(Req, _From, Channel) ->
|
handle_call(Req, _From, Channel) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, Channel}.
|
{reply, ignored, Channel}.
|
||||||
|
@ -202,6 +245,9 @@ handle_call(Req, _From, Channel) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle Cast
|
%% Handle Cast
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec handle_cast(Req :: term(), channel())
|
||||||
|
-> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
|
||||||
handle_cast(Req, Channel) ->
|
handle_cast(Req, Channel) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
@ -209,9 +255,9 @@ handle_cast(Req, Channel) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle Info
|
%% Handle Info
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
handle_info({subscribe, _}, Channel) ->
|
|
||||||
{ok, Channel};
|
|
||||||
|
|
||||||
|
-spec(handle_info(Info :: term(), channel())
|
||||||
|
-> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}).
|
||||||
handle_info(Info, Channel) ->
|
handle_info(Info, Channel) ->
|
||||||
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
@ -412,6 +458,7 @@ ensure_disconnected(Reason, Channel = #channel{
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Call Chain
|
%% Call Chain
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
call_session(Fun, Msg, #channel{session = Session} = Channel) ->
|
call_session(Fun, Msg, #channel{session = Session} = Channel) ->
|
||||||
Result = emqx_coap_session:Fun(Msg, Session),
|
Result = emqx_coap_session:Fun(Msg, Session),
|
||||||
handle_result(Result, Channel).
|
handle_result(Result, Channel).
|
||||||
|
|
|
@ -18,17 +18,15 @@
|
||||||
|
|
||||||
-behaviour(emqx_gateway_frame).
|
-behaviour(emqx_gateway_frame).
|
||||||
|
|
||||||
%% emqx_gateway_frame API
|
%% emqx_gateway_frame callbacks
|
||||||
-export([ initial_parse_state/1
|
-export([ initial_parse_state/1
|
||||||
, serialize_opts/0
|
, serialize_opts/0
|
||||||
, serialize_pkt/2
|
, serialize_pkt/2
|
||||||
, parse/2
|
, parse/2
|
||||||
, format/1
|
, format/1
|
||||||
, type/1
|
, type/1
|
||||||
, is_message/1]).
|
, is_message/1
|
||||||
|
]).
|
||||||
%% API
|
|
||||||
-export([]).
|
|
||||||
|
|
||||||
-include("include/emqx_coap.hrl").
|
-include("include/emqx_coap.hrl").
|
||||||
-include("apps/emqx/include/types.hrl").
|
-include("apps/emqx/include/types.hrl").
|
||||||
|
@ -58,9 +56,11 @@
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec initial_parse_state(map()) -> emqx_gateway_frame:parse_state().
|
||||||
initial_parse_state(_) ->
|
initial_parse_state(_) ->
|
||||||
#{}.
|
#{}.
|
||||||
|
|
||||||
|
-spec serialize_opts() -> emqx_gateway_frame:serialize_options().
|
||||||
serialize_opts() ->
|
serialize_opts() ->
|
||||||
#{}.
|
#{}.
|
||||||
|
|
||||||
|
@ -235,6 +235,9 @@ method_to_class_code(Method) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% parse
|
%% parse
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec parse(binary(), emqx_gateway_frame:parse_state())
|
||||||
|
-> emqx_gateway_frame:parse_result().
|
||||||
parse(<<?VERSION:2, Type:2, 0:4, 0:3, 0:5, MsgId:16>>, ParseState) ->
|
parse(<<?VERSION:2, Type:2, 0:4, 0:3, 0:5, MsgId:16>>, ParseState) ->
|
||||||
{ok,
|
{ok,
|
||||||
#coap_message{ type = decode_type(Type)
|
#coap_message{ type = decode_type(Type)
|
||||||
|
|
|
@ -22,7 +22,8 @@
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([ new/0
|
-export([ new/0
|
||||||
, process_subscribe/4]).
|
, process_subscribe/4
|
||||||
|
]).
|
||||||
|
|
||||||
-export([ info/1
|
-export([ info/1
|
||||||
, info/2
|
, info/2
|
||||||
|
|
|
@ -63,7 +63,7 @@
|
||||||
|
|
||||||
-type event_result(State) ::
|
-type event_result(State) ::
|
||||||
#{next => State,
|
#{next => State,
|
||||||
outgoing => emqx_coap_message(),
|
outgoing => coap_message(),
|
||||||
timeouts => list(ttimeout()),
|
timeouts => list(ttimeout()),
|
||||||
has_sub => undefined | sub_register(),
|
has_sub => undefined | sub_register(),
|
||||||
transport => emqx_coap_transport:transprot()}.
|
transport => emqx_coap_transport:transprot()}.
|
||||||
|
@ -75,12 +75,13 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec new() -> manager().
|
||||||
new() ->
|
new() ->
|
||||||
#{ seq_id => 1
|
#{ seq_id => 1
|
||||||
, next_msg_id => rand:uniform(?MAX_MESSAGE_ID)
|
, next_msg_id => rand:uniform(?MAX_MESSAGE_ID)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%% client request
|
|
||||||
handle_request(#coap_message{id = MsgId} = Msg, TM) ->
|
handle_request(#coap_message{id = MsgId} = Msg, TM) ->
|
||||||
Id = {in, MsgId},
|
Id = {in, MsgId},
|
||||||
case find_machine(Id, TM) of
|
case find_machine(Id, TM) of
|
||||||
|
@ -296,7 +297,7 @@ new_in_machine(MachineId, #{seq_id := SeqId} = Manager) ->
|
||||||
SeqId => Machine,
|
SeqId => Machine,
|
||||||
MachineId => SeqId}}.
|
MachineId => SeqId}}.
|
||||||
|
|
||||||
-spec new_out_machine(state_machine_key(), any(), emqx_coap_message(), manager()) ->
|
-spec new_out_machine(state_machine_key(), any(), coap_message(), manager()) ->
|
||||||
{state_machine(), manager()}.
|
{state_machine(), manager()}.
|
||||||
new_out_machine(MachineId,
|
new_out_machine(MachineId,
|
||||||
Ctx,
|
Ctx,
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
|
|
||||||
-type request_context() :: any().
|
-type request_context() :: any().
|
||||||
|
|
||||||
-record(transport, { cache :: undefined | emqx_coap_message()
|
-record(transport, { cache :: undefined | coap_message()
|
||||||
, req_context :: request_context()
|
, req_context :: request_context()
|
||||||
, retry_interval :: non_neg_integer()
|
, retry_interval :: non_neg_integer()
|
||||||
, retry_count :: non_neg_integer()
|
, retry_count :: non_neg_integer()
|
||||||
|
@ -26,7 +26,6 @@
|
||||||
|
|
||||||
-export_type([transport/0]).
|
-export_type([transport/0]).
|
||||||
|
|
||||||
-import(emqx_coap_message, [reset/1]).
|
|
||||||
-import(emqx_coap_medium, [ empty/0, reset/2, proto_out/2
|
-import(emqx_coap_medium, [ empty/0, reset/2, proto_out/2
|
||||||
, out/1, out/2, proto_out/1
|
, out/1, out/2, proto_out/1
|
||||||
, reply/2]).
|
, reply/2]).
|
||||||
|
@ -166,7 +165,7 @@ observe(in,
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
#{next => stop};
|
#{next => stop};
|
||||||
_ ->
|
_ ->
|
||||||
reset(Message)
|
emqx_coap_message:reset(Message)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
until_stop(_, _, _) ->
|
until_stop(_, _, _) ->
|
||||||
|
@ -187,5 +186,5 @@ on_response(#coap_message{type = Type} = Message,
|
||||||
out(Ack, #{next => NextState,
|
out(Ack, #{next => NextState,
|
||||||
transport => Transport#transport{cache = Ack}}));
|
transport => Transport#transport{cache = Ack}}));
|
||||||
true ->
|
true ->
|
||||||
reset(Message)
|
emqx_coap_message:reset(Message)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
|
|
||||||
-define(UNSUB(Topic, Msg), #{subscribe => {Topic, Msg}}).
|
-define(UNSUB(Topic, Msg), #{subscribe => {Topic, Msg}}).
|
||||||
-define(SUB(Topic, Token, Msg), #{subscribe => {{Topic, Token}, Msg}}).
|
-define(SUB(Topic, Token, Msg), #{subscribe => {{Topic, Token}, Msg}}).
|
||||||
-define(SUBOPTS, #{qos => 0, rh => 0, rap => 0, nl => 0, is_new => false}).
|
-define(SUBOPTS, #{qos => 0, rh => 1, rap => 0, nl => 0, is_new => false}).
|
||||||
|
|
||||||
%% TODO maybe can merge this code into emqx_coap_session, simplify the call chain
|
%% TODO maybe can merge this code into emqx_coap_session, simplify the call chain
|
||||||
|
|
||||||
|
|
|
@ -73,4 +73,4 @@
|
||||||
, options = #{}
|
, options = #{}
|
||||||
, payload = <<>>}).
|
, payload = <<>>}).
|
||||||
|
|
||||||
-type emqx_coap_message() :: #coap_message{}.
|
-type coap_message() :: #coap_message{}.
|
||||||
|
|
|
@ -194,8 +194,8 @@ schema("/gateway/:name/authentication/users") ->
|
||||||
, responses =>
|
, responses =>
|
||||||
?STANDARD_RESP(
|
?STANDARD_RESP(
|
||||||
#{ 200 => emqx_dashboard_swagger:schema_with_example(
|
#{ 200 => emqx_dashboard_swagger:schema_with_example(
|
||||||
ref(emqx_authn_api, response_user),
|
ref(emqx_authn_api, response_users),
|
||||||
emqx_authn_api:response_user_examples())
|
emqx_authn_api:response_users_example())
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
post =>
|
post =>
|
||||||
|
|
|
@ -148,6 +148,10 @@ subscriptions(get, #{ bindings := #{name := Name0,
|
||||||
ClientId = emqx_mgmt_util:urldecode(ClientId0),
|
ClientId = emqx_mgmt_util:urldecode(ClientId0),
|
||||||
with_gateway(Name0, fun(GwName, _) ->
|
with_gateway(Name0, fun(GwName, _) ->
|
||||||
case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
|
case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
|
||||||
|
{error, nosupport} ->
|
||||||
|
return_http_error(405, <<"Not support to list subscriptions">>);
|
||||||
|
{error, noimpl} ->
|
||||||
|
return_http_error(501, <<"Not implemented now">>);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
return_http_error(500, Reason);
|
return_http_error(500, Reason);
|
||||||
{ok, Subs} ->
|
{ok, Subs} ->
|
||||||
|
@ -168,6 +172,14 @@ subscriptions(post, #{ bindings := #{name := Name0,
|
||||||
{Topic, QoS} ->
|
{Topic, QoS} ->
|
||||||
case emqx_gateway_http:client_subscribe(
|
case emqx_gateway_http:client_subscribe(
|
||||||
GwName, ClientId, Topic, QoS) of
|
GwName, ClientId, Topic, QoS) of
|
||||||
|
{error, nosupport} ->
|
||||||
|
return_http_error(
|
||||||
|
405,
|
||||||
|
<<"Not support to add a subscription">>);
|
||||||
|
{error, noimpl} ->
|
||||||
|
return_http_error(
|
||||||
|
501,
|
||||||
|
<<"Not implemented now">>);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
return_http_error(404, Reason);
|
return_http_error(404, Reason);
|
||||||
ok ->
|
ok ->
|
||||||
|
|
|
@ -274,15 +274,18 @@ kickout_client(Node, GwName, ClientId) ->
|
||||||
-> {error, any()}
|
-> {error, any()}
|
||||||
| {ok, list()}.
|
| {ok, list()}.
|
||||||
list_client_subscriptions(GwName, ClientId) ->
|
list_client_subscriptions(GwName, ClientId) ->
|
||||||
%% Get the subscriptions from session-info
|
|
||||||
with_channel(GwName, ClientId,
|
with_channel(GwName, ClientId,
|
||||||
fun(Pid) ->
|
fun(Pid) ->
|
||||||
Subs = emqx_gateway_conn:call(
|
case emqx_gateway_conn:call(
|
||||||
Pid,
|
Pid,
|
||||||
subscriptions, ?DEFAULT_CALL_TIMEOUT),
|
subscriptions, ?DEFAULT_CALL_TIMEOUT) of
|
||||||
|
{ok, Subs} ->
|
||||||
{ok, lists:map(fun({Topic, SubOpts}) ->
|
{ok, lists:map(fun({Topic, SubOpts}) ->
|
||||||
SubOpts#{topic => Topic}
|
SubOpts#{topic => Topic}
|
||||||
end, Subs)}
|
end, Subs)};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end
|
||||||
end).
|
end).
|
||||||
|
|
||||||
-spec client_subscribe(gateway_name(), emqx_type:clientid(),
|
-spec client_subscribe(gateway_name(), emqx_type:clientid(),
|
||||||
|
@ -330,7 +333,9 @@ return_http_error(Code, Msg) ->
|
||||||
codestr(400) -> 'BAD_REQUEST';
|
codestr(400) -> 'BAD_REQUEST';
|
||||||
codestr(401) -> 'NOT_SUPPORTED_NOW';
|
codestr(401) -> 'NOT_SUPPORTED_NOW';
|
||||||
codestr(404) -> 'RESOURCE_NOT_FOUND';
|
codestr(404) -> 'RESOURCE_NOT_FOUND';
|
||||||
codestr(500) -> 'UNKNOW_ERROR'.
|
codestr(405) -> 'METHOD_NOT_ALLOWED';
|
||||||
|
codestr(500) -> 'UNKNOW_ERROR';
|
||||||
|
codestr(501) -> 'NOT_IMPLEMENTED'.
|
||||||
|
|
||||||
-spec with_authn(binary(), function()) -> any().
|
-spec with_authn(binary(), function()) -> any().
|
||||||
with_authn(GwName0, Fun) ->
|
with_authn(GwName0, Fun) ->
|
||||||
|
|
|
@ -449,7 +449,7 @@ it has two purposes:
|
||||||
sc(ref(clientinfo_override),
|
sc(ref(clientinfo_override),
|
||||||
#{ desc => ""
|
#{ desc => ""
|
||||||
})}
|
})}
|
||||||
, {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, authentication_schema()}
|
, {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM, authentication_schema()}
|
||||||
].
|
].
|
||||||
|
|
||||||
common_listener_opts() ->
|
common_listener_opts() ->
|
||||||
|
@ -468,7 +468,7 @@ common_listener_opts() ->
|
||||||
sc(integer(),
|
sc(integer(),
|
||||||
#{ default => 1000
|
#{ default => 1000
|
||||||
})}
|
})}
|
||||||
, {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, authentication_schema()}
|
, {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM, authentication_schema()}
|
||||||
, {mountpoint,
|
, {mountpoint,
|
||||||
sc(binary(),
|
sc(binary(),
|
||||||
#{ default => undefined
|
#{ default => undefined
|
||||||
|
|
|
@ -174,6 +174,7 @@ handle_timeout(_, _, Channel) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle call
|
%% Handle call
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Channel) ->
|
handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Channel) ->
|
||||||
Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session),
|
Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session),
|
||||||
{reply, {ok, Result}, Channel};
|
{reply, {ok, Result}, Channel};
|
||||||
|
@ -182,6 +183,21 @@ handle_call({send_cmd, Cmd}, _From, Channel) ->
|
||||||
{ok, Outs, Channel2} = call_session(send_cmd, Cmd, Channel),
|
{ok, Outs, Channel2} = call_session(send_cmd, Cmd, Channel),
|
||||||
{reply, ok, Outs, Channel2};
|
{reply, ok, Outs, Channel2};
|
||||||
|
|
||||||
|
handle_call({subscribe, _Topic, _SubOpts}, _From, Channel) ->
|
||||||
|
{reply, {error, noimpl}, Channel};
|
||||||
|
|
||||||
|
handle_call({unsubscribe, _Topic}, _From, Channel) ->
|
||||||
|
{reply, {error, noimpl}, Channel};
|
||||||
|
|
||||||
|
handle_call(subscriptions, _From, Channel) ->
|
||||||
|
{reply, {error, noimpl}, Channel};
|
||||||
|
|
||||||
|
handle_call(kick, _From, Channel) ->
|
||||||
|
{reply, {error, noimpl}, Channel};
|
||||||
|
|
||||||
|
handle_call(discard, _From, Channel) ->
|
||||||
|
{reply, {error, noimpl}, Channel};
|
||||||
|
|
||||||
handle_call(Req, _From, Channel) ->
|
handle_call(Req, _From, Channel) ->
|
||||||
?SLOG(error, #{ msg => "unexpected_call"
|
?SLOG(error, #{ msg => "unexpected_call"
|
||||||
, call => Req
|
, call => Req
|
||||||
|
|
|
@ -42,7 +42,7 @@
|
||||||
-type request_context() :: map().
|
-type request_context() :: map().
|
||||||
|
|
||||||
-type timestamp() :: non_neg_integer().
|
-type timestamp() :: non_neg_integer().
|
||||||
-type queued_request() :: {timestamp(), request_context(), emqx_coap_message()}.
|
-type queued_request() :: {timestamp(), request_context(), coap_message()}.
|
||||||
|
|
||||||
-type cmd_path() :: binary().
|
-type cmd_path() :: binary().
|
||||||
-type cmd_type() :: binary().
|
-type cmd_type() :: binary().
|
||||||
|
@ -120,7 +120,7 @@ new() ->
|
||||||
, cmd_record = #{queue => queue:new()}
|
, cmd_record = #{queue => queue:new()}
|
||||||
, lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}.
|
, lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}.
|
||||||
|
|
||||||
-spec init(emqx_coap_message(), binary(), function(), session()) -> map().
|
-spec init(coap_message(), binary(), function(), session()) -> map().
|
||||||
init(#coap_message{options = Opts,
|
init(#coap_message{options = Opts,
|
||||||
payload = Payload} = Msg, MountPoint, WithContext, Session) ->
|
payload = Payload} = Msg, MountPoint, WithContext, Session) ->
|
||||||
Query = maps:get(uri_query, Opts),
|
Query = maps:get(uri_query, Opts),
|
||||||
|
@ -347,7 +347,7 @@ get_lifetime(#{<<"lt">> := _} = NewRegInfo, _) ->
|
||||||
get_lifetime(_, OldRegInfo) ->
|
get_lifetime(_, OldRegInfo) ->
|
||||||
get_lifetime(OldRegInfo).
|
get_lifetime(OldRegInfo).
|
||||||
|
|
||||||
-spec update(emqx_coap_message(), function(), binary(), session()) -> map().
|
-spec update(coap_message(), function(), binary(), session()) -> map().
|
||||||
update(#coap_message{options = Opts, payload = Payload} = Msg,
|
update(#coap_message{options = Opts, payload = Payload} = Msg,
|
||||||
WithContext,
|
WithContext,
|
||||||
CmdType,
|
CmdType,
|
||||||
|
|
|
@ -73,16 +73,16 @@
|
||||||
transaction :: #{binary() => list()}
|
transaction :: #{binary() => list()}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type(channel() :: #channel{}).
|
-type channel() :: #channel{}.
|
||||||
|
|
||||||
-type(conn_state() :: idle | connecting | connected | disconnected).
|
-type conn_state() :: idle | connecting | connected | disconnected.
|
||||||
|
|
||||||
-type(reply() :: {outgoing, stomp_frame()}
|
-type reply() :: {outgoing, stomp_frame()}
|
||||||
| {outgoing, [stomp_frame()]}
|
| {outgoing, [stomp_frame()]}
|
||||||
| {event, conn_state()|updated}
|
| {event, conn_state()|updated}
|
||||||
| {close, Reason :: atom()}).
|
| {close, Reason :: atom()}.
|
||||||
|
|
||||||
-type(replies() :: reply() | [reply()]).
|
-type replies() :: reply() | [reply()].
|
||||||
|
|
||||||
-define(TIMER_TABLE, #{
|
-define(TIMER_TABLE, #{
|
||||||
incoming_timer => keepalive,
|
incoming_timer => keepalive,
|
||||||
|
@ -155,7 +155,7 @@ setting_peercert_infos(Peercert, ClientInfo) ->
|
||||||
info(Channel) ->
|
info(Channel) ->
|
||||||
maps:from_list(info(?INFO_KEYS, Channel)).
|
maps:from_list(info(?INFO_KEYS, Channel)).
|
||||||
|
|
||||||
-spec(info(list(atom())|atom(), channel()) -> term()).
|
-spec info(list(atom())|atom(), channel()) -> term().
|
||||||
info(Keys, Channel) when is_list(Keys) ->
|
info(Keys, Channel) when is_list(Keys) ->
|
||||||
[{Key, info(Key, Channel)} || Key <- Keys];
|
[{Key, info(Key, Channel)} || Key <- Keys];
|
||||||
|
|
||||||
|
@ -174,7 +174,7 @@ info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
|
||||||
info(ctx, #channel{ctx = Ctx}) ->
|
info(ctx, #channel{ctx = Ctx}) ->
|
||||||
Ctx.
|
Ctx.
|
||||||
|
|
||||||
-spec(stats(channel()) -> emqx_types:stats()).
|
-spec stats(channel()) -> emqx_types:stats().
|
||||||
stats(#channel{subscriptions = Subs}) ->
|
stats(#channel{subscriptions = Subs}) ->
|
||||||
[{subscriptions_cnt, length(Subs)}].
|
[{subscriptions_cnt, length(Subs)}].
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue