diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl index 87410f7d8..5d3b81d54 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl @@ -32,6 +32,12 @@ -type serialize_options() :: map(). +-export_type([ parse_state/0 + , parse_result/0 + , serialize_options/0 + , frame/0 + ]). + %% Callbacks %% @doc Initial the frame parser states diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index e1dad1dee..ab079b587 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -25,7 +25,8 @@ , validator/4 , metrics_inc/2 , run_hooks/3 - , send_request/2]). + , send_request/2 + ]). -export([ init/2 , handle_in/2 @@ -48,59 +49,76 @@ -define(AUTHN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM). -record(channel, { - %% Context - ctx :: emqx_gateway_ctx:context(), - %% Connection Info - conninfo :: emqx_types:conninfo(), - %% Client Info - clientinfo :: emqx_types:clientinfo(), - %% Session - session :: emqx_coap_session:session() | undefined, - %% Keepalive - keepalive :: emqx_keepalive:keepalive() | undefined, - %% Timer - timers :: #{atom() => disable | undefined | reference()}, - - connection_required :: boolean(), - - conn_state :: idle | connected | disconnected, - - token :: binary() | undefined - }). + %% Context + ctx :: emqx_gateway_ctx:context(), + %% Connection Info + conninfo :: emqx_types:conninfo(), + %% Client Info + clientinfo :: emqx_types:clientinfo(), + %% Session + session :: emqx_coap_session:session() | undefined, + %% Keepalive + keepalive :: emqx_keepalive:keepalive() | undefined, + %% Timer + timers :: #{atom() => disable | undefined | reference()}, + %% Connection mode + connection_required :: boolean(), + %% Connection State + conn_state :: conn_state(), + %% Session token to identity this connection + token :: binary() | undefined + }). -type channel() :: #channel{}. + +-type conn_state() :: idle | connecting | connected | disconnected. + +-type reply() :: {outgoing, coap_message()} + | {outgoing, [coap_message()]} + | {event, conn_state()|updated} + | {close, Reason :: atom()}. + +-type replies() :: reply() | [reply()]. + -define(TOKEN_MAXIMUM, 4294967295). + -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). + -define(DEF_IDLE_TIME, timer:seconds(30)). -define(GET_IDLE_TIME(Cfg), maps:get(idle_timeout, Cfg, ?DEF_IDLE_TIME)). -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- +-spec info(channel()) -> emqx_types:infos(). info(Channel) -> maps:from_list(info(?INFO_KEYS, Channel)). +-spec info(list(atom())|atom(), channel()) -> term(). info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; info(conninfo, #channel{conninfo = ConnInfo}) -> ConnInfo; -info(conn_state, #channel{conn_state = CState}) -> - CState; +info(conn_state, #channel{conn_state = ConnState}) -> + ConnState; info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> - emqx_misc:maybe_apply(fun emqx_session:info/1, Session); + emqx_misc:maybe_apply(fun emqx_coap_session:info/1, Session); info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> ClientId; info(ctx, #channel{ctx = Ctx}) -> Ctx. +-spec stats(channel()) -> emqx_types:stats(). stats(_) -> []. +-spec init(map(), map()) -> channel(). init(ConnInfoT = #{peername := {PeerHost, _}, sockname := {_, SockPort}}, #{ctx := Ctx} = Config) -> @@ -126,8 +144,8 @@ init(ConnInfoT = #{peername := {PeerHost, _}, } ), - %% because it is possible to disconnect after init, and then trigger the $event.disconnected hook - %% and these two fields are required in the hook + %% because it is possible to disconnect after init, and then trigger the + %% $event.disconnected hook and these two fields are required in the hook ConnInfo = ConnInfoT#{proto_name => <<"CoAP">>, proto_ver => <<"1">>}, Heartbeat = ?GET_IDLE_TIME(Config), @@ -144,13 +162,19 @@ init(ConnInfoT = #{peername := {PeerHost, _}, validator(Type, Topic, Ctx, ClientInfo) -> emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic). --spec send_request(pid(), emqx_coap_message()) -> any(). +-spec send_request(pid(), coap_message()) -> any(). send_request(Channel, Request) -> gen_server:send_request(Channel, {?FUNCTION_NAME, Request}). %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- + +-spec handle_in(coap_message() | {frame_error, any()}, channel()) + -> {ok, channel()} + | {ok, replies(), channel()} + | {shutdown, Reason :: term(), channel()} + | {shutdown, Reason :: term(), replies(), channel()}. handle_in(Msg, ChannleT) -> Channel = ensure_keepalive_timer(ChannleT), case emqx_coap_message:is_request(Msg) of @@ -170,6 +194,7 @@ handle_deliver(Delivers, #channel{session = Session, %%-------------------------------------------------------------------- %% Handle timeout %%-------------------------------------------------------------------- + handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel) -> case emqx_keepalive:check(NewVal, KeepAlive) of {ok, NewKeepAlive} -> @@ -191,10 +216,72 @@ handle_timeout(_, _, Channel) -> %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- + +-spec(handle_call(Req :: term(), From :: term(), channel()) + -> {reply, Reply :: term(), channel()} + | {reply, Reply :: term(), replies(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), coap_message(), channel()}). handle_call({send_request, Msg}, From, Channel) -> Result = call_session(handle_out, {{send_request, From}, Msg}, Channel), erlang:setelement(1, Result, noreply); +handle_call({subscribe, Topic, SubOpts}, _From, + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo + = #{clientid := ClientId, + mountpoint := Mountpoint}, + session = Session}) -> + Token = maps:get(token, + maps:get(sub_props, SubOpts, #{}), + <<>>), + NSubOpts = maps:merge( + emqx_gateway_utils:default_subopts(), + SubOpts), + MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic), + _ = emqx_broker:subscribe(MountedTopic, ClientId, NSubOpts), + + _ = run_hooks(Ctx, 'session.subscribed', + [ClientInfo, MountedTopic, NSubOpts]), + %% modifty session state + SubReq = {Topic, Token}, + TempMsg = #coap_message{type = non}, + Result = emqx_coap_session:process_subscribe( + SubReq, TempMsg, #{}, Session), + NSession = maps:get(session, Result), + {reply, {ok, {MountedTopic, NSubOpts}}, Channel#channel{session = NSession}}; + +handle_call({unsubscribe, Topic}, _From, + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo + = #{mountpoint := Mountpoint}, + session = Session}) -> + MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic), + ok = emqx_broker:unsubscribe(MountedTopic), + _ = run_hooks(Ctx, 'session.unsubscribe', + [ClientInfo, MountedTopic, #{}]), + + %% modifty session state + UnSubReq = Topic, + TempMsg = #coap_message{type = non}, + Result = emqx_coap_session:process_subscribe( + UnSubReq, TempMsg, #{}, Session), + NSession = maps:get(session, Result), + {reply, ok, Channel#channel{session = NSession}}; + +handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> + Subs = emqx_coap_session:info(subscriptions, Session), + {reply, {ok, maps:to_list(Subs)}, Channel}; + +handle_call(kick, _From, Channel) -> + NChannel = ensure_disconnected(kicked, Channel), + shutdown_and_reply(kicked, ok, NChannel); + +handle_call(discard, _From, Channel) -> + shutdown_and_reply(discarded, ok, Channel); + handle_call(Req, _From, Channel) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, Channel}. @@ -202,6 +289,9 @@ handle_call(Req, _From, Channel) -> %%-------------------------------------------------------------------- %% Handle Cast %%-------------------------------------------------------------------- + +-spec handle_cast(Req :: term(), channel()) + -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. handle_cast(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Req}), {ok, Channel}. @@ -209,9 +299,9 @@ handle_cast(Req, Channel) -> %%-------------------------------------------------------------------- %% Handle Info %%-------------------------------------------------------------------- -handle_info({subscribe, _}, Channel) -> - {ok, Channel}; +-spec(handle_info(Info :: term(), channel()) + -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}). handle_info(Info, Channel) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {ok, Channel}. @@ -352,15 +442,6 @@ fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) -> Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), {ok, ClientInfo#{mountpoint := Mountpoint1}}. -ensure_connected(Channel = #channel{ctx = Ctx, - conninfo = ConnInfo, - clientinfo = ClientInfo}) -> - NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond) - }, - ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - _ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, []]), - Channel#channel{conninfo = NConnInfo}. - process_connect(#channel{ctx = Ctx, session = Session, conninfo = ConnInfo, @@ -401,6 +482,21 @@ run_hooks(Ctx, Name, Args, Acc) -> metrics_inc(Name, Ctx) -> emqx_gateway_ctx:metrics_inc(Ctx, Name). +%%-------------------------------------------------------------------- +%% Ensure connected + +ensure_connected(Channel = #channel{ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo}) -> + NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond) + }, + _ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, []]), + ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), + Channel#channel{conninfo = NConnInfo, conn_state = connected}. + +%%-------------------------------------------------------------------- +%% Ensure disconnected + ensure_disconnected(Reason, Channel = #channel{ ctx = Ctx, conninfo = ConnInfo, @@ -409,9 +505,16 @@ ensure_disconnected(Reason, Channel = #channel{ ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]), Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. +shutdown_and_reply(Reason, Reply, Channel) -> + {shutdown, Reason, Reply, Channel}. + +%shutdown_and_reply(Reason, Reply, OutPkt, Channel) -> +% {shutdown, Reason, Reply, OutPkt, Channel}. + %%-------------------------------------------------------------------- %% Call Chain %%-------------------------------------------------------------------- + call_session(Fun, Msg, #channel{session = Session} = Channel) -> Result = emqx_coap_session:Fun(Msg, Session), handle_result(Result, Channel). diff --git a/apps/emqx_gateway/src/coap/emqx_coap_frame.erl b/apps/emqx_gateway/src/coap/emqx_coap_frame.erl index e5305b417..a567ea7f4 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_frame.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_frame.erl @@ -18,17 +18,15 @@ -behaviour(emqx_gateway_frame). -%% emqx_gateway_frame API +%% emqx_gateway_frame callbacks -export([ initial_parse_state/1 , serialize_opts/0 , serialize_pkt/2 , parse/2 , format/1 , type/1 - , is_message/1]). - -%% API --export([]). + , is_message/1 + ]). -include("include/emqx_coap.hrl"). -include("apps/emqx/include/types.hrl"). @@ -58,9 +56,11 @@ %% API %%-------------------------------------------------------------------- +-spec initial_parse_state(map()) -> emqx_gateway_frame:parse_state(). initial_parse_state(_) -> #{}. +-spec serialize_opts() -> emqx_gateway_frame:serialize_options(). serialize_opts() -> #{}. @@ -235,6 +235,9 @@ method_to_class_code(Method) -> %%-------------------------------------------------------------------- %% parse %%-------------------------------------------------------------------- + +-spec parse(binary(), emqx_gateway_frame:parse_state()) + -> emqx_gateway_frame:parse_result(). parse(<>, ParseState) -> {ok, #coap_message{ type = decode_type(Type) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_medium.erl b/apps/emqx_gateway/src/coap/emqx_coap_medium.erl index 8dafc7bbb..020e38496 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_medium.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_medium.erl @@ -53,8 +53,8 @@ out(Msg, Result) -> proto_out(Proto) -> proto_out(Proto, #{}). -proto_out(Proto, Resut) -> - Resut#{proto => Proto}. +proto_out(Proto, Result) -> + Result#{proto => Proto}. reply(Method, Req) when not is_record(Method, coap_message) -> reply(Method, <<>>, Req); diff --git a/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl b/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl index 20473322e..dbec52d2b 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl @@ -23,7 +23,6 @@ -define(MAX_SEQ_ID, 16777215). --type topic() :: binary(). -type token() :: binary(). -type seq_id() :: 0 .. ?MAX_SEQ_ID. @@ -31,7 +30,7 @@ , seq_id := seq_id() }. --type manager() :: #{topic => res()}. +-type manager() :: #{emqx_types:topic() => res()}. %%-------------------------------------------------------------------- %% API @@ -40,7 +39,7 @@ new_manager() -> #{}. --spec insert(topic(), token(), manager()) -> {seq_id(), manager()}. +-spec insert(emqx_types:topic(), token(), manager()) -> {seq_id(), manager()}. insert(Topic, Token, Manager) -> Res = case maps:get(Topic, Manager, undefined) of undefined -> @@ -50,11 +49,11 @@ insert(Topic, Token, Manager) -> end, {maps:get(seq_id, Res), Manager#{Topic => Res}}. --spec remove(topic(), manager()) -> manager(). +-spec remove(emqx_types:topic(), manager()) -> manager(). remove(Topic, Manager) -> maps:remove(Topic, Manager). --spec res_changed(topic(), manager()) -> undefined | {token(), seq_id(), manager()}. +-spec res_changed(emqx_types:topic(), manager()) -> undefined | {token(), seq_id(), manager()}. res_changed(Topic, Manager) -> case maps:get(Topic, Manager, undefined) of undefined -> @@ -73,6 +72,7 @@ foreach(F, Manager) -> Manager), ok. +-spec subscriptions(manager()) -> [emqx_types:topic()]. subscriptions(Manager) -> maps:keys(Manager). diff --git a/apps/emqx_gateway/src/coap/emqx_coap_session.erl b/apps/emqx_gateway/src/coap/emqx_coap_session.erl index 13f1be240..9c3a8c451 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_session.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_session.erl @@ -22,7 +22,8 @@ %% API -export([ new/0 - , process_subscribe/4]). + , process_subscribe/4 + ]). -export([ info/1 , info/2 @@ -90,7 +91,8 @@ info(Session) -> info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; info(subscriptions, #session{observe_manager = OM}) -> - emqx_coap_observe_res:subscriptions(OM); + Topics = emqx_coap_observe_res:subscriptions(OM), + lists:foldl(fun(T, Acc) -> Acc#{T => ?DEFAULT_SUBOPTS} end, #{}, Topics); info(subscriptions_cnt, #session{observe_manager = OM}) -> erlang:length(emqx_coap_observe_res:subscriptions(OM)); info(subscriptions_max, _) -> diff --git a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl index b5e4deb7f..d6d05fd40 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl @@ -63,7 +63,7 @@ -type event_result(State) :: #{next => State, - outgoing => emqx_coap_message(), + outgoing => coap_message(), timeouts => list(ttimeout()), has_sub => undefined | sub_register(), transport => emqx_coap_transport:transprot()}. @@ -75,12 +75,13 @@ %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- + +-spec new() -> manager(). new() -> #{ seq_id => 1 , next_msg_id => rand:uniform(?MAX_MESSAGE_ID) }. -%% client request handle_request(#coap_message{id = MsgId} = Msg, TM) -> Id = {in, MsgId}, case find_machine(Id, TM) of @@ -296,7 +297,7 @@ new_in_machine(MachineId, #{seq_id := SeqId} = Manager) -> SeqId => Machine, MachineId => SeqId}}. --spec new_out_machine(state_machine_key(), any(), emqx_coap_message(), manager()) -> +-spec new_out_machine(state_machine_key(), any(), coap_message(), manager()) -> {state_machine(), manager()}. new_out_machine(MachineId, Ctx, diff --git a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl index 2e858a2e1..07e522309 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl @@ -11,7 +11,7 @@ -type request_context() :: any(). --record(transport, { cache :: undefined | emqx_coap_message() +-record(transport, { cache :: undefined | coap_message() , req_context :: request_context() , retry_interval :: non_neg_integer() , retry_count :: non_neg_integer() @@ -26,7 +26,6 @@ -export_type([transport/0]). --import(emqx_coap_message, [reset/1]). -import(emqx_coap_medium, [ empty/0, reset/2, proto_out/2 , out/1, out/2, proto_out/1 , reply/2]). @@ -166,7 +165,7 @@ observe(in, {error, _} -> #{next => stop}; _ -> - reset(Message) + emqx_coap_message:reset(Message) end. until_stop(_, _, _) -> @@ -187,5 +186,5 @@ on_response(#coap_message{type = Type} = Message, out(Ack, #{next => NextState, transport => Transport#transport{cache = Ack}})); true -> - reset(Message) + emqx_coap_message:reset(Message) end. diff --git a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl index 76d065523..608eae92a 100644 --- a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl +++ b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl @@ -28,7 +28,7 @@ -define(UNSUB(Topic, Msg), #{subscribe => {Topic, Msg}}). -define(SUB(Topic, Token, Msg), #{subscribe => {{Topic, Token}, Msg}}). --define(SUBOPTS, #{qos => 0, rh => 0, rap => 0, nl => 0, is_new => false}). +-define(SUBOPTS, #{qos => 0, rh => 1, rap => 0, nl => 0, is_new => false}). %% TODO maybe can merge this code into emqx_coap_session, simplify the call chain @@ -146,7 +146,7 @@ subscribe(#coap_message{token = Token} = Msg, Topic, Ctx, CInfo) -> SubOpts = get_sub_opts(Msg), MountTopic = mount(CInfo, Topic), emqx_broker:subscribe(MountTopic, ClientId, SubOpts), - run_hooks(Ctx, 'session.subscribed', [CInfo, Topic, SubOpts]), + run_hooks(Ctx, 'session.subscribed', [CInfo, MountTopic, SubOpts]), ?SUB(MountTopic, Token, Msg); _ -> reply({error, unauthorized}, Msg) diff --git a/apps/emqx_gateway/src/coap/include/emqx_coap.hrl b/apps/emqx_gateway/src/coap/include/emqx_coap.hrl index d47dd17fd..3ec37cc4a 100644 --- a/apps/emqx_gateway/src/coap/include/emqx_coap.hrl +++ b/apps/emqx_gateway/src/coap/include/emqx_coap.hrl @@ -73,4 +73,4 @@ , options = #{} , payload = <<>>}). --type emqx_coap_message() :: #coap_message{}. +-type coap_message() :: #coap_message{}. diff --git a/apps/emqx_gateway/src/emqx_gateway.erl b/apps/emqx_gateway/src/emqx_gateway.erl index 96cc5d4ae..1992104a0 100644 --- a/apps/emqx_gateway/src/emqx_gateway.erl +++ b/apps/emqx_gateway/src/emqx_gateway.erl @@ -16,8 +16,6 @@ -module(emqx_gateway). --behaviour(emqx_config_handler). - -include("include/emqx_gateway.hrl"). %% Gateway APIs diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 7bf52b4ec..3ee209f19 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -497,11 +497,11 @@ examples_gateway_confs() -> , auto_observe => false , update_msg_publish_condition => <<"always">> , translators => - #{ command => #{topic => <<"/dn/#">>} - , response => #{topic => <<"/up/resp">>} - , notify => #{topic => <<"/up/notify">>} - , register => #{topic => <<"/up/resp">>} - , update => #{topic => <<"/up/resp">>} + #{ command => #{topic => <<"dn/#">>} + , response => #{topic => <<"up/resp">>} + , notify => #{topic => <<"up/notify">>} + , register => #{topic => <<"up/resp">>} + , update => #{topic => <<"up/resp">>} } , listeners => [ #{ type => <<"udp">> @@ -599,11 +599,11 @@ examples_update_gateway_confs() -> , auto_observe => false , update_msg_publish_condition => <<"always">> , translators => - #{ command => #{topic => <<"/dn/#">>} - , response => #{topic => <<"/up/resp">>} - , notify => #{topic => <<"/up/notify">>} - , register => #{topic => <<"/up/resp">>} - , update => #{topic => <<"/up/resp">>} + #{ command => #{topic => <<"dn/#">>} + , response => #{topic => <<"up/resp">>} + , notify => #{topic => <<"up/notify">>} + , register => #{topic => <<"up/resp">>} + , update => #{topic => <<"up/resp">>} } } } diff --git a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl index 701890633..aa56e6fdd 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl @@ -194,8 +194,8 @@ schema("/gateway/:name/authentication/users") -> , responses => ?STANDARD_RESP( #{ 200 => emqx_dashboard_swagger:schema_with_example( - ref(emqx_authn_api, response_user), - emqx_authn_api:response_user_examples()) + ref(emqx_authn_api, response_users), + emqx_authn_api:response_users_example()) }) }, post => diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 9fe36d25e..69a06be61 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -87,8 +87,7 @@ paths() -> , {<<"lte_lifetime">>, timestamp} ]). --define(query_fun, {?MODULE, query}). --define(format_fun, {?MODULE, format_channel_info}). +-define(QUERY_FUN, {?MODULE, query}). clients(get, #{ bindings := #{name := Name0} , query_string := Params @@ -99,14 +98,14 @@ clients(get, #{ bindings := #{name := Name0} undefined -> Response = emqx_mgmt_api:cluster_query( Params, TabName, - ?CLIENT_QS_SCHEMA, ?query_fun), + ?CLIENT_QS_SCHEMA, ?QUERY_FUN), emqx_mgmt_util:generate_response(Response); Node1 -> Node = binary_to_atom(Node1, utf8), ParamsWithoutNode = maps:without([<<"node">>], Params), Response = emqx_mgmt_api:node_query( Node, ParamsWithoutNode, - TabName, ?CLIENT_QS_SCHEMA, ?query_fun), + TabName, ?CLIENT_QS_SCHEMA, ?QUERY_FUN), emqx_mgmt_util:generate_response(Response) end end). @@ -148,6 +147,10 @@ subscriptions(get, #{ bindings := #{name := Name0, ClientId = emqx_mgmt_util:urldecode(ClientId0), with_gateway(Name0, fun(GwName, _) -> case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of + {error, nosupport} -> + return_http_error(405, <<"Not support to list subscriptions">>); + {error, noimpl} -> + return_http_error(501, <<"Not implemented now">>); {error, Reason} -> return_http_error(500, Reason); {ok, Subs} -> @@ -165,13 +168,21 @@ subscriptions(post, #{ bindings := #{name := Name0, case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of {undefined, _} -> return_http_error(400, "Miss topic property"); - {Topic, QoS} -> + {Topic, SubOpts} -> case emqx_gateway_http:client_subscribe( - GwName, ClientId, Topic, QoS) of + GwName, ClientId, Topic, SubOpts) of + {error, nosupport} -> + return_http_error( + 405, + <<"Not support to add a subscription">>); + {error, noimpl} -> + return_http_error( + 501, + <<"Not implemented now">>); {error, Reason} -> return_http_error(404, Reason); - ok -> - {204} + {ok, {NTopic, NSubOpts}}-> + {201, maps:merge(NSubOpts, #{topic => NTopic})} end end end); @@ -193,12 +204,16 @@ subscriptions(delete, #{ bindings := #{name := Name0, %% Utils subopts(Req) -> - #{ qos => maps:get(<<"qos">>, Req, 0) - , rap => maps:get(<<"rap">>, Req, 0) - , nl => maps:get(<<"nl">>, Req, 0) - , rh => maps:get(<<"rh">>, Req, 0) - , sub_props => extra_sub_props(maps:get(<<"sub_props">>, Req, #{})) - }. + SubOpts = #{ qos => maps:get(<<"qos">>, Req, 0) + , rap => maps:get(<<"rap">>, Req, 0) + , nl => maps:get(<<"nl">>, Req, 0) + , rh => maps:get(<<"rh">>, Req, 1) + }, + SubProps = extra_sub_props(maps:get(<<"sub_props">>, Req, #{})), + case maps:size(SubProps) of + 0 -> SubOpts; + _ -> maps:put(sub_props, SubProps, SubOpts) + end. extra_sub_props(Props) -> maps:filter( @@ -444,8 +459,7 @@ schema("/gateway/:name/clients/:clientid/subscriptions") -> , post => #{ description => <<"Create a subscription membership">> , parameters => params_client_insta() - %% FIXME: - , requestBody => emqx_dashboard_swagger:schema_with_examples( + , 'requestBody' => emqx_dashboard_swagger:schema_with_examples( ref(subscription), examples_subsctiption()) , responses => @@ -878,5 +892,4 @@ example_general_subscription() -> , nl => 0 , rap => 0 , rh => 0 - , sub_props => #{} }. diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index 351093e0f..987c9720b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -17,6 +17,8 @@ %% @doc The gateway configuration management module -module(emqx_gateway_conf). +-behaviour(emqx_config_handler). + %% Load/Unload -export([ load/0 , unload/0 @@ -270,7 +272,7 @@ ret_gw(GwName, {ok, #{raw_config := GwConf}}) -> lists:map(fun({LName, LConf}) -> do_convert_listener2(GwName, LType, LName, LConf) end, maps:to_list(SubConf)), - [NLConfs|Acc] + [NLConfs | Acc] end, [], maps:to_list(LsConf)), {ok, maps:merge(GwConf1, #{<<"listeners">> => NLsConf})}; ret_gw(_GwName, Err) -> Err. diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 434a0bc49..810a79987 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -235,7 +235,7 @@ confexp({error, already_exist}) -> %%-------------------------------------------------------------------- -spec lookup_client(gateway_name(), - emqx_type:clientid(), {atom(), atom()}) -> list(). + emqx_types:clientid(), {atom(), atom()}) -> list(). lookup_client(GwName, ClientId, FormatFun) -> lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) || Node <- mria_mnesia:running_nodes()]). @@ -253,7 +253,7 @@ lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) -> rpc_call(Node, lookup_client, [Node, GwName, {clientid, ClientId}, FormatFun]). --spec kickout_client(gateway_name(), emqx_type:clientid()) +-spec kickout_client(gateway_name(), emqx_types:clientid()) -> {error, any()} | ok. kickout_client(GwName, ClientId) -> @@ -270,25 +270,28 @@ kickout_client(Node, GwName, ClientId) when Node =:= node() -> kickout_client(Node, GwName, ClientId) -> rpc_call(Node, kickout_client, [Node, GwName, ClientId]). --spec list_client_subscriptions(gateway_name(), emqx_type:clientid()) +-spec list_client_subscriptions(gateway_name(), emqx_types:clientid()) -> {error, any()} | {ok, list()}. list_client_subscriptions(GwName, ClientId) -> - %% Get the subscriptions from session-info with_channel(GwName, ClientId, fun(Pid) -> - Subs = emqx_gateway_conn:call( - Pid, - subscriptions, ?DEFAULT_CALL_TIMEOUT), - {ok, lists:map(fun({Topic, SubOpts}) -> - SubOpts#{topic => Topic} - end, Subs)} + case emqx_gateway_conn:call( + Pid, + subscriptions, ?DEFAULT_CALL_TIMEOUT) of + {ok, Subs} -> + {ok, lists:map(fun({Topic, SubOpts}) -> + SubOpts#{topic => Topic} + end, Subs)}; + {error, Reason} -> + {error, Reason} + end end). --spec client_subscribe(gateway_name(), emqx_type:clientid(), - emqx_type:topic(), emqx_type:subopts()) +-spec client_subscribe(gateway_name(), emqx_types:clientid(), + emqx_types:topic(), emqx_types:subopts()) -> {error, any()} - | ok. + | {ok, {emqx_types:topic(), emqx_types:subopts()}}. client_subscribe(GwName, ClientId, Topic, SubOpts) -> with_channel(GwName, ClientId, fun(Pid) -> @@ -299,7 +302,7 @@ client_subscribe(GwName, ClientId, Topic, SubOpts) -> end). -spec client_unsubscribe(gateway_name(), - emqx_type:clientid(), emqx_type:topic()) + emqx_types:clientid(), emqx_types:topic()) -> {error, any()} | ok. client_unsubscribe(GwName, ClientId, Topic) -> @@ -330,7 +333,9 @@ return_http_error(Code, Msg) -> codestr(400) -> 'BAD_REQUEST'; codestr(401) -> 'NOT_SUPPORTED_NOW'; codestr(404) -> 'RESOURCE_NOT_FOUND'; -codestr(500) -> 'UNKNOW_ERROR'. +codestr(405) -> 'METHOD_NOT_ALLOWED'; +codestr(500) -> 'UNKNOW_ERROR'; +codestr(501) -> 'NOT_IMPLEMENTED'. -spec with_authn(binary(), function()) -> any(). with_authn(GwName0, Fun) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 18b195c5b..dfdf6ea2a 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -449,7 +449,7 @@ it has two purposes: sc(ref(clientinfo_override), #{ desc => "" })} - , {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, authentication_schema()} + , {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM, authentication_schema()} ]. common_listener_opts() -> @@ -468,7 +468,7 @@ common_listener_opts() -> sc(integer(), #{ default => 1000 })} - , {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, authentication_schema()} + , {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM, authentication_schema()} , {mountpoint, sc(binary(), #{ default => undefined diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 5d17fe6ca..fa74f9437 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -321,7 +321,7 @@ default_udp_options() -> [binary]. default_subopts() -> - #{rh => 0, %% Retain Handling + #{rh => 1, %% Retain Handling rap => 0, %% Retain as Publish nl => 0, %% No Local qos => 0, %% QoS diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 58b497cfc..bb8072358 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -334,13 +334,14 @@ handle_call({subscribe_from_client, TopicFilter, Qos}, _From, deny -> {reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel}; _ -> - {ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel), + {ok, _, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel), {reply, ok, NChannel} end; handle_call({subscribe, Topic, SubOpts}, _From, Channel) -> - {ok, NChannel} = do_subscribe([{Topic, SubOpts}], Channel), - {reply, ok, NChannel}; + {ok, + [{NTopicFilter, NSubOpts}], NChannel} = do_subscribe([{Topic, SubOpts}], Channel), + {reply, {ok, {NTopicFilter, NSubOpts}}, NChannel}; handle_call({unsubscribe_from_client, TopicFilter}, _From, Channel = #channel{conn_state = connected}) -> @@ -351,6 +352,9 @@ handle_call({unsubscribe, Topic}, _From, Channel) -> {ok, NChannel} = do_unsubscribe([Topic], Channel), {reply, ok, NChannel}; +handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) -> + {reply, {ok, maps:to_list(Subs)}, Channel}; + handle_call({publish, Topic, Qos, Payload}, _From, Channel = #channel{ ctx = Ctx, @@ -369,7 +373,10 @@ handle_call({publish, Topic, Qos, Payload}, _From, end; handle_call(kick, _From, Channel) -> - {shutdown, kicked, ok, Channel}; + {shutdown, kicked, ok, ensure_disconnected(kicked, Channel)}; + +handle_call(discard, _From, Channel) -> + {shutdown, discarded, ok, Channel}; handle_call(Req, _From, Channel) -> ?SLOG(warning, #{ msg => "unexpected_call" @@ -431,11 +438,12 @@ terminate(Reason, Channel) -> %%-------------------------------------------------------------------- do_subscribe(TopicFilters, Channel) -> - NChannel = lists:foldl( - fun({TopicFilter, SubOpts}, ChannelAcc) -> - do_subscribe(TopicFilter, SubOpts, ChannelAcc) - end, Channel, parse_topic_filters(TopicFilters)), - {ok, NChannel}. + {MadeSubs, NChannel} = lists:foldl( + fun({TopicFilter, SubOpts}, {MadeSubs, ChannelAcc}) -> + {Sub, Channel1} = do_subscribe(TopicFilter, SubOpts, ChannelAcc), + {MadeSubs ++ [Sub], Channel1} + end, {[], Channel}, parse_topic_filters(TopicFilters)), + {ok, MadeSubs, NChannel}. %% @private do_subscribe(TopicFilter, SubOpts, Channel = @@ -445,17 +453,20 @@ do_subscribe(TopicFilter, SubOpts, Channel = NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter), NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts), SubId = maps:get(clientid, ClientInfo, undefined), + %% XXX: is_new? IsNew = not maps:is_key(NTopicFilter, Subs), case IsNew of true -> ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts), ok = emqx_hooks:run('session.subscribed', [ClientInfo, NTopicFilter, NSubOpts#{is_new => IsNew}]), - Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}; + {{NTopicFilter, NSubOpts}, + Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}}; _ -> %% Update subopts ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts), - Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}} + {{NTopicFilter, NSubOpts}, + Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}} end. do_unsubscribe(TopicFilters, Channel) -> diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index f3c64d678..c01c6adc5 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -51,11 +51,26 @@ clientinfo :: emqx_types:clientinfo(), %% Session session :: emqx_lwm2m_session:session() | undefined, + %% Channl State + %% TODO: is there need + conn_state :: conn_state(), %% Timer timers :: #{atom() => disable | undefined | reference()}, + %% FIXME: don't store anonymouse func with_context :: function() }). +-type channel() :: #channel{}. + +-type conn_state() :: idle | connecting | connected | disconnected. + +-type reply() :: {outgoing, coap_message()} + | {outgoing, [coap_message()]} + | {event, conn_state()|updated} + | {close, Reason :: atom()}. + +-type replies() :: reply() | [reply()]. + %% TODO: -define(DEFAULT_OVERRIDE, #{ clientid => <<"">> %% Generate clientid by default @@ -79,8 +94,8 @@ info(Keys, Channel) when is_list(Keys) -> info(conninfo, #channel{conninfo = ConnInfo}) -> ConnInfo; -info(conn_state, _) -> - connected; +info(conn_state, #channel{conn_state = ConnState}) -> + ConnState; info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> @@ -125,15 +140,10 @@ init(ConnInfoT = #{peername := {PeerHost, _}, , clientinfo = ClientInfo , timers = #{} , session = emqx_lwm2m_session:new() - %% FIXME: don't store anonymouse func + , conn_state = idle , with_context = with_context(Ctx, ClientInfo) }. -with_context(Ctx, ClientInfo) -> - fun(Type, Topic) -> - with_context(Type, Topic, Ctx, ClientInfo) - end. - lookup_cmd(Channel, Path, Action) -> gen_server:call(Channel, {?FUNCTION_NAME, Path, Action}). @@ -143,9 +153,15 @@ send_cmd(Channel, Cmd) -> %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- -handle_in(Msg, ChannleT) -> - Channel = update_life_timer(ChannleT), - call_session(handle_coap_in, Msg, Channel). + +-spec handle_in(coap_message() | {frame_error, any()}, channel()) + -> {ok, channel()} + | {ok, replies(), channel()} + | {shutdown, Reason :: term(), channel()} + | {shutdown, Reason :: term(), replies(), channel()}. +handle_in(Msg, Channle) -> + NChannel = update_life_timer(Channle), + call_session(handle_coap_in, Msg, NChannel). %%-------------------------------------------------------------------- %% Handle Delivers from broker to client @@ -174,7 +190,9 @@ handle_timeout(_, _, Channel) -> %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- -handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Channel) -> + +handle_call({lookup_cmd, Path, Type}, _From, + Channel = #channel{session = Session}) -> Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session), {reply, {ok, Result}, Channel}; @@ -182,6 +200,66 @@ handle_call({send_cmd, Cmd}, _From, Channel) -> {ok, Outs, Channel2} = call_session(send_cmd, Cmd, Channel), {reply, ok, Outs, Channel2}; +handle_call({subscribe, Topic, SubOpts}, _From, + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo + = #{clientid := ClientId, + mountpoint := Mountpoint}, + session = Session}) -> + NSubOpts = maps:merge( + emqx_gateway_utils:default_subopts(), + SubOpts), + MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic), + _ = emqx_broker:subscribe(MountedTopic, ClientId, NSubOpts), + + _ = run_hooks(Ctx, 'session.subscribed', + [ClientInfo, MountedTopic, NSubOpts]), + %% modifty session state + Subs = emqx_lwm2m_session:info(subscriptions, Session), + NSubs = maps:put(MountedTopic, NSubOpts, Subs), + NSession = emqx_lwm2m_session:set_subscriptions(NSubs, Session), + {reply, {ok, {MountedTopic, NSubOpts}}, Channel#channel{session = NSession}}; + +handle_call({unsubscribe, Topic}, _From, + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo + = #{mountpoint := Mountpoint}, + session = Session}) -> + MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic), + ok = emqx_broker:unsubscribe(MountedTopic), + _ = run_hooks(Ctx, 'session.unsubscribe', + [ClientInfo, MountedTopic, #{}]), + %% modifty session state + Subs = emqx_lwm2m_session:info(subscriptions, Session), + NSubs = maps:remove(MountedTopic, Subs), + NSession = emqx_lwm2m_session:set_subscriptions(NSubs, Session), + {reply, ok, Channel#channel{session = NSession}}; + +handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> + Subs = maps:to_list(emqx_lwm2m_session:info(subscriptions, Session)), + {reply, {ok, Subs}, Channel}; + +handle_call(kick, _From, Channel) -> + NChannel = ensure_disconnected(kicked, Channel), + shutdown_and_reply(kicked, ok, NChannel); + +handle_call(discard, _From, Channel) -> + shutdown_and_reply(discarded, ok, Channel); + +%% TODO: No Session Takeover +%handle_call({takeover, 'begin'}, _From, Channel = #channel{session = Session}) -> +% reply(Session, Channel#channel{takeover = true}); +% +%handle_call({takeover, 'end'}, _From, Channel = #channel{session = Session, +% pendings = Pendings}) -> +% ok = emqx_session:takeover(Session), +% %% TODO: Should not drain deliver here (side effect) +% Delivers = emqx_misc:drain_deliver(), +% AllPendings = lists:append(Delivers, Pendings), +% shutdown_and_reply(takenover, AllPendings, Channel); + handle_call(Req, _From, Channel) -> ?SLOG(error, #{ msg => "unexpected_call" , call => Req @@ -223,6 +301,41 @@ terminate(Reason, #channel{ctx = Ctx, %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% Ensure connected + +ensure_connected(Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo}) -> + _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]), + + NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, + ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), + Channel#channel{ + conninfo = NConnInfo, + conn_state = connected + }. + +%%-------------------------------------------------------------------- +%% Ensure disconnected + +ensure_disconnected(Reason, Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo}) -> + NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, + ok = run_hooks(Ctx, 'client.disconnected', + [ClientInfo, Reason, NConnInfo]), + Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. + +shutdown_and_reply(Reason, Reply, Channel) -> + {shutdown, Reason, Reply, Channel}. + +%shutdown_and_reply(Reason, Reply, OutPkt, Channel) -> +% {shutdown, Reason, Reply, OutPkt, Channel}. + set_peercert_infos(NoSSL, ClientInfo) when NoSSL =:= nossl; NoSSL =:= undefined -> @@ -319,6 +432,7 @@ enrich_clientinfo(#coap_message{options = Options} = Msg, Query = maps:get(uri_query, Options, #{}), case Query of #{<<"ep">> := Epn, <<"lt">> := Lifetime} -> + %% FIXME: the following keys is not belong standrad protocol Username = maps:get(<<"imei">>, Query, Epn), Password = maps:get(<<"password">>, Query, undefined), ClientId = maps:get(<<"device_id">>, Query, Epn), @@ -363,13 +477,6 @@ fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) -> Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), {ok, ClientInfo#{mountpoint := Mountpoint1}}. -ensure_connected(Channel = #channel{ctx = Ctx, - conninfo = ConnInfo, - clientinfo = ClientInfo}) -> - _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]), - ok = run_hooks(Ctx, 'client.connected', [ClientInfo, ConnInfo]), - Channel. - process_connect(Channel = #channel{ctx = Ctx, session = Session, conninfo = ConnInfo, @@ -418,29 +525,44 @@ gets([H | T], Map) -> gets([], Val) -> Val. +%%-------------------------------------------------------------------- +%% With Context + +with_context(Ctx, ClientInfo) -> + fun(Type, Topic) -> + with_context(Type, Topic, Ctx, ClientInfo) + end. + with_context(publish, [Topic, Msg], Ctx, ClientInfo) -> case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of allow -> - emqx:publish(Msg); + _ = emqx_broker:publish(Msg), + ok; _ -> ?SLOG(error, #{ msg => "publish_denied" , topic => Topic - }) + }), + {error, deny} end; -with_context(subscribe, [Topic, Opts], Ctx, #{username := Username} = ClientInfo) -> +with_context(subscribe, [Topic, Opts], Ctx, ClientInfo) -> + #{clientid := ClientId, + endpoint_name := EndpointName} = ClientInfo, case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of allow -> run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, Opts]), ?SLOG(debug, #{ msg => "subscribe_topic_succeed" , topic => Topic - , endpoint_name => Username + , clientid => ClientId + , endpoint_name => EndpointName }), - emqx:subscribe(Topic, Username, Opts); + emqx_broker:subscribe(Topic, ClientId, Opts), + ok; _ -> ?SLOG(error, #{ msg => "subscribe_denied" , topic => Topic - }) + }), + {error, deny} end; with_context(metrics, Name, Ctx, _ClientInfo) -> diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl index 6def4874a..bee1bedcd 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl @@ -25,9 +25,11 @@ -export([ new/0, init/4, update/3, parse_object_list/1 , reregister/3, on_close/1, find_cmd_record/3]). +%% Info & Stats -export([ info/1 , info/2 , stats/1 + , stats/2 ]). -export([ handle_coap_in/3 @@ -37,12 +39,16 @@ , send_cmd/3 , set_reply/2]). +%% froce update subscriptions +-export([ set_subscriptions/2 + ]). + -export_type([session/0]). -type request_context() :: map(). -type timestamp() :: non_neg_integer(). --type queued_request() :: {timestamp(), request_context(), emqx_coap_message()}. +-type queued_request() :: {timestamp(), request_context(), coap_message()}. -type cmd_path() :: binary(). -type cmd_type() :: binary(). @@ -66,6 +72,7 @@ , last_active_at :: non_neg_integer() , created_at :: non_neg_integer() , cmd_record :: cmd_record() + , subscriptions :: map() }). -type session() :: #session{}. @@ -83,7 +90,9 @@ -define(lwm2m_up_dm_topic, {<<"/v1/up/dm">>, 0}). %% steal from emqx_session --define(INFO_KEYS, [subscriptions, +-define(INFO_KEYS, [id, + is_persistent, + subscriptions, upgrade_qos, retry_interval, await_rel_timeout, @@ -99,7 +108,8 @@ mqueue_dropped, next_pkt_id, awaiting_rel_cnt, - awaiting_rel_max + awaiting_rel_max, + latency_stats ]). -define(OUT_LIST_KEY, out_list). @@ -118,9 +128,11 @@ new() -> , is_cache_mode = false , mountpoint = <<>> , cmd_record = #{queue => queue:new()} - , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}. + , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max]) + , subscriptions = #{} + }. --spec init(emqx_coap_message(), binary(), function(), session()) -> map(). +-spec init(coap_message(), binary(), function(), session()) -> map(). init(#coap_message{options = Opts, payload = Payload} = Msg, MountPoint, WithContext, Session) -> Query = maps:get(uri_query, Opts), @@ -152,7 +164,7 @@ update(Msg, WithContext, Session) -> on_close(Session) -> #{topic := Topic} = downlink_topic(), MountedTopic = mount(Topic, Session), - emqx:unsubscribe(MountedTopic), + emqx_broker:unsubscribe(MountedTopic), MountedTopic. -spec find_cmd_record(cmd_path(), cmd_type(), session()) -> cmd_result(). @@ -169,55 +181,56 @@ info(Session) -> info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; -info(location_path, #session{location_path = Path}) -> - Path; - -info(lifetime, #session{lifetime = LT}) -> - LT; - -info(reg_info, #session{reg_info = RI}) -> - RI; - -info(subscriptions, _) -> - []; -info(subscriptions_cnt, _) -> - 0; -info(subscriptions_max, _) -> - infinity; +info(id, _) -> + undefined; +info(is_persistent, _) -> + false; +info(subscriptions, #session{subscriptions = Subs}) -> + Subs; info(upgrade_qos, _) -> - ?QOS_0; -info(inflight, _) -> - emqx_inflight:new(); -info(inflight_cnt, _) -> - 0; -info(inflight_max, _) -> - 0; + false; info(retry_interval, _) -> - infinity; -info(mqueue, _) -> - emqx_mqueue:init(#{max_len => 0, store_qos0 => false}); -info(mqueue_len, #session{queue = Queue}) -> - queue:len(Queue); -info(mqueue_max, _) -> 0; -info(mqueue_dropped, _) -> - 0; -info(next_pkt_id, _) -> - 0; -info(awaiting_rel, _) -> - #{}; -info(awaiting_rel_cnt, _) -> - 0; -info(awaiting_rel_max, _) -> - infinity; info(await_rel_timeout, _) -> infinity; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt. + CreatedAt; +%% used for channel +info(location_path, #session{location_path = Path}) -> + Path; +info(lifetime, #session{lifetime = LT}) -> + LT; +info(reg_info, #session{reg_info = RI}) -> + RI. -%% @doc Get stats of the session. -spec(stats(session()) -> emqx_types:stats()). -stats(Session) -> info(?STATS_KEYS, Session). +stats(Session) -> stats(?STATS_KEYS, Session). + +stats(Keys, Session) when is_list(Keys) -> + [{Key, stats(Key, Session)} || Key <- Keys]; + +stats(subscriptions_cnt, #session{subscriptions = Subs}) -> + maps:size(Subs); +stats(subscriptions_max, _) -> + infinity; +stats(inflight_cnt, _) -> + 0; +stats(inflight_max, _) -> + 0; +stats(mqueue_len, _) -> + 0; +stats(mqueue_max, _) -> + 0; +stats(mqueue_dropped, _) -> + 0; +stats(next_pkt_id, _) -> + 0; +stats(awaiting_rel_cnt, _) -> + 0; +stats(awaiting_rel_max, _) -> + infinity; +stats(latency_stats, _) -> + #{}. %%-------------------------------------------------------------------- %% API @@ -242,6 +255,9 @@ set_reply(Msg, #session{coap = Coap} = Session) -> send_cmd(Cmd, _, Session) -> return(send_cmd_impl(Cmd, Session)). +set_subscriptions(Subs, Session) -> + Session#session{subscriptions = Subs}. + %%-------------------------------------------------------------------- %% Protocol Stack %%-------------------------------------------------------------------- @@ -347,7 +363,7 @@ get_lifetime(#{<<"lt">> := _} = NewRegInfo, _) -> get_lifetime(_, OldRegInfo) -> get_lifetime(OldRegInfo). --spec update(emqx_coap_message(), function(), binary(), session()) -> map(). +-spec update(coap_message(), function(), binary(), session()) -> map(). update(#coap_message{options = Opts, payload = Payload} = Msg, WithContext, CmdType, @@ -377,7 +393,10 @@ register_init(WithContext, #session{reg_info = RegInfo} = Session) -> %% - subscribe to the downlink_topic and wait for commands #{topic := Topic, qos := Qos} = downlink_topic(), MountedTopic = mount(Topic, Session), - Session3 = subscribe(MountedTopic, Qos, WithContext, Session2), + SubOpts = maps:merge( + emqx_gateway_utils:default_subopts(), + #{qos => Qos}), + Session3 = do_subscribe(MountedTopic, SubOpts, WithContext, Session2), Session4 = send_dl_msg(Session3), %% - report the registration info @@ -387,22 +406,33 @@ register_init(WithContext, #session{reg_info = RegInfo} = Session) -> %%-------------------------------------------------------------------- %% Subscribe %%-------------------------------------------------------------------- + proto_subscribe(WithContext, #session{wait_ack = WaitAck} = Session) -> #{topic := Topic, qos := Qos} = downlink_topic(), MountedTopic = mount(Topic, Session), - Session2 = case WaitAck of + SubOpts = maps:merge( + emqx_gateway_utils:default_subopts(), + #{qos => Qos}), + NSession = case WaitAck of undefined -> Session; Ctx -> - MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(Ctx, <<"coap_timeout">>), - send_to_mqtt(Ctx, <<"coap_timeout">>, MqttPayload, WithContext, Session) + MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt( + Ctx, <<"coap_timeout">>), + send_to_mqtt(Ctx, <<"coap_timeout">>, + MqttPayload, WithContext, Session) end, - subscribe(MountedTopic, Qos, WithContext, Session2). + do_subscribe(MountedTopic, SubOpts, WithContext, NSession). -subscribe(Topic, Qos, WithContext, Session) -> - Opts = get_sub_opts(Qos), - WithContext(subscribe, [Topic, Opts]), - Session. +do_subscribe(Topic, SubOpts, WithContext, + Session = #session{subscriptions = Subs}) -> + case WithContext(subscribe, [Topic, SubOpts]) of + {error, _} -> + Session; + ok -> + NSubs = maps:put(Topic, SubOpts, Subs), + Session#session{subscriptions = NSubs} + end. send_auto_observe(RegInfo, Session) -> %% - auto observe the objects @@ -449,15 +479,6 @@ deliver_auto_observe_to_coap(AlternatePath, TermData, Session) -> {Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData), maybe_do_deliver_to_coap(Ctx, Req, 0, false, Session). -get_sub_opts(Qos) -> - #{ - qos => Qos, - rap => 0, - nl => 0, - rh => 0, - is_new => false - }. - is_auto_observe() -> emqx:get_config([gateway, lwm2m, auto_observe]). @@ -609,7 +630,7 @@ proto_publish(Topic, Payload, Qos, Headers, WithContext, %% TODO: Append message metadata into headers Msg = emqx_message:make(Epn, Qos, MountedTopic, emqx_json:encode(Payload), #{}, Headers), - WithContext(publish, [MountedTopic, Msg]), + _ = WithContext(publish, [MountedTopic, Msg]), Session. mount(Topic, #session{mountpoint = MountPoint}) when is_binary(Topic) -> diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index f9cfa3cbe..74c8d2c12 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -1194,8 +1194,8 @@ handle_call({subscribe, Topic, SubOpts}, _From, Channel) -> 2 -> case do_subscribe({?SN_INVALID_TOPIC_ID, Topic, SubOpts}, Channel) of - {ok, _, NChannel} -> - reply(ok, NChannel); + {ok, {_, NTopicName, NSubOpts}, NChannel} -> + reply({ok, {NTopicName, NSubOpts}}, NChannel); {error, ?SN_EXCEED_LIMITATION} -> reply({error, exceed_limitation}, Channel) end; @@ -1214,7 +1214,7 @@ handle_call({unsubscribe, Topic}, _From, Channel) -> reply(ok, NChannel); handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> - reply(maps:to_list(emqx_session:info(subscriptions, Session)), Channel); + reply({ok, maps:to_list(emqx_session:info(subscriptions, Session))}, Channel); handle_call(kick, _From, Channel) -> NChannel = ensure_disconnected(kicked, Channel), diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index c6570adfc..dfca57cf1 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -61,7 +61,7 @@ session :: undefined | map(), %% ClientInfo override specs clientinfo_override :: map(), - %% Connection Channel + %% Channel State conn_state :: conn_state(), %% Heartbeat heartbeat :: emqx_stomp_heartbeat:heartbeat(), @@ -73,16 +73,16 @@ transaction :: #{binary() => list()} }). --type(channel() :: #channel{}). +-type channel() :: #channel{}. --type(conn_state() :: idle | connecting | connected | disconnected). +-type conn_state() :: idle | connecting | connected | disconnected. --type(reply() :: {outgoing, stomp_frame()} +-type reply() :: {outgoing, stomp_frame()} | {outgoing, [stomp_frame()]} | {event, conn_state()|updated} - | {close, Reason :: atom()}). + | {close, Reason :: atom()}. --type(replies() :: reply() | [reply()]). +-type replies() :: reply() | [reply()]. -define(TIMER_TABLE, #{ incoming_timer => keepalive, @@ -155,7 +155,7 @@ setting_peercert_infos(Peercert, ClientInfo) -> info(Channel) -> maps:from_list(info(?INFO_KEYS, Channel)). --spec(info(list(atom())|atom(), channel()) -> term()). +-spec info(list(atom())|atom(), channel()) -> term(). info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; @@ -174,7 +174,7 @@ info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> info(ctx, #channel{ctx = Ctx}) -> Ctx. --spec(stats(channel()) -> emqx_types:stats()). +-spec stats(channel()) -> emqx_types:stats(). stats(#channel{subscriptions = Subs}) -> [{subscriptions_cnt, length(Subs)}]. @@ -294,9 +294,9 @@ ensure_connected(Channel = #channel{ clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - Channel#channel{conninfo = NConnInfo, - conn_state = connected - }. + Channel#channel{ + conninfo = NConnInfo, + conn_state = connected}. process_connect(Channel = #channel{ ctx = Ctx, @@ -660,7 +660,7 @@ handle_call({subscribe, Topic, SubOpts}, _From, ), NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts}|Subs], NChannel1 = NChannel#channel{subscriptions = NSubs}, - reply(ok, NChannel1); + reply({ok, {MountedTopic, NSubOpts}}, NChannel1); {error, ErrMsg, NChannel} -> ?SLOG(error, #{ msg => "failed_to_subscribe_topic" , topic => Topic @@ -688,11 +688,11 @@ handle_call({unsubscribe, Topic}, _From, %% Reply :: [{emqx_types:topic(), emqx_types:subopts()}] handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) -> - Reply = lists:map( + NSubs = lists:map( fun({_SubId, Topic, _Ack, SubOpts}) -> {Topic, SubOpts} end, Subs), - reply(Reply, Channel); + reply({ok, NSubs}, Channel); handle_call(kick, _From, Channel) -> NChannel = ensure_disconnected(kicked, Channel), diff --git a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl index 179471127..6dd6860b7 100644 --- a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl @@ -395,7 +395,7 @@ t_rest_clienit_info(_) -> ?assertEqual(1, length(Subs)), assert_feilds_apperence([topic, qos], lists:nth(1, Subs)), - {204, _} = request(post, ClientPath ++ "/subscriptions", + {201, _} = request(post, ClientPath ++ "/subscriptions", #{topic => <<"t/a">>, qos => 1, sub_props => #{subid => <<"1001">>}}),