%%-------------------------------------------------------------------- %% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_lwm2m_channel). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). -include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl"). %% API -export([ info/1 , info/2 , stats/1 , with_context/2 , do_takeover/3 , lookup_cmd/3 , send_cmd/2 ]). -export([ init/2 , handle_in/2 , handle_deliver/2 , handle_timeout/3 , terminate/2 ]). -export([ handle_call/3 , handle_cast/2 , handle_info/2 ]). -record(channel, { %% Context ctx :: emqx_gateway_ctx:context(), %% Connection Info conninfo :: emqx_types:conninfo(), %% Client Info clientinfo :: emqx_types:clientinfo(), %% Session session :: emqx_lwm2m_session:session() | undefined, %% Timer timers :: #{atom() => disable | undefined | reference()}, with_context :: function() }). %% TODO: -define(DEFAULT_OVERRIDE, #{ clientid => <<"">> %% Generate clientid by default , username => <<"${Packet.uri_query.ep}">> , password => <<"">> }). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]). %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- info(Channel) -> maps:from_list(info(?INFO_KEYS, Channel)). info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; info(conninfo, #channel{conninfo = ConnInfo}) -> ConnInfo; info(conn_state, _) -> connected; info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> emqx_misc:maybe_apply(fun emqx_lwm2m_session:info/1, Session); info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> ClientId; info(ctx, #channel{ctx = Ctx}) -> Ctx. stats(_) -> []. init(ConnInfo = #{peername := {PeerHost, _}, sockname := {_, SockPort}}, #{ctx := Ctx} = Config) -> Peercert = maps:get(peercert, ConnInfo, undefined), Mountpoint = maps:get(mountpoint, Config, undefined), ListenerId = case maps:get(listener, Config, undefined) of undefined -> undefined; {GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName) end, ClientInfo = set_peercert_infos( Peercert, #{ zone => default , listener => ListenerId , protocol => lwm2m , peerhost => PeerHost , sockport => SockPort , username => undefined , clientid => undefined , is_bridge => false , is_superuser => false , mountpoint => Mountpoint } ), #channel{ ctx = Ctx , conninfo = ConnInfo , clientinfo = ClientInfo , timers = #{} , session = emqx_lwm2m_session:new() %% FIXME: don't store anonymouse func , with_context = with_context(Ctx, ClientInfo) }. with_context(Ctx, ClientInfo) -> fun(Type, Topic) -> with_context(Type, Topic, Ctx, ClientInfo) end. lookup_cmd(Channel, Path, Action) -> gen_server:call(Channel, {?FUNCTION_NAME, Path, Action}). send_cmd(Channel, Cmd) -> gen_server:call(Channel, {?FUNCTION_NAME, Cmd}). %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- handle_in(Msg, ChannleT) -> Channel = update_life_timer(ChannleT), call_session(handle_coap_in, Msg, Channel). %%-------------------------------------------------------------------- %% Handle Delivers from broker to client %%-------------------------------------------------------------------- handle_deliver(Delivers, Channel) -> call_session(handle_deliver, Delivers, Channel). %%-------------------------------------------------------------------- %% Handle timeout %%-------------------------------------------------------------------- handle_timeout(_, lifetime, #channel{ctx = Ctx, clientinfo = ClientInfo, conninfo = ConnInfo} = Channel) -> ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, timeout, ConnInfo]), {shutdown, timeout, Channel}; handle_timeout(_, {transport, _} = Msg, Channel) -> call_session(timeout, Msg, Channel); handle_timeout(_, disconnect, Channel) -> {shutdown, normal, Channel}; handle_timeout(_, _, Channel) -> {ok, Channel}. %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Channel) -> Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session), {reply, {ok, Result}, Channel}; handle_call({send_cmd, Cmd}, _From, Channel) -> {ok, Outs, Channel2} = call_session(send_cmd, Cmd, Channel), {reply, ok, Outs, Channel2}; handle_call(Req, _From, Channel) -> ?SLOG(error, #{ msg => "unexpected_call" , call => Req }), {reply, ignored, Channel}. %%-------------------------------------------------------------------- %% Handle Cast %%-------------------------------------------------------------------- handle_cast(Req, Channel) -> ?SLOG(error, #{ msg => "unexpected_cast" , cast => Req }), {ok, Channel}. %%-------------------------------------------------------------------- %% Handle Info %%-------------------------------------------------------------------- handle_info({subscribe, _AutoSubs}, Channel) -> %% not need handle this message {ok, Channel}; handle_info(Info, Channel) -> ?SLOG(error, #{ msg => "unexpected_info" , info => Info }), {ok, Channel}. %%-------------------------------------------------------------------- %% Terminate %%-------------------------------------------------------------------- terminate(Reason, #channel{ctx = Ctx, clientinfo = ClientInfo, session = Session}) -> MountedTopic = emqx_lwm2m_session:on_close(Session), _ = run_hooks(Ctx, 'session.unsubscribe', [ClientInfo, MountedTopic, #{}]), run_hooks(Ctx, 'session.terminated', [ClientInfo, Reason, Session]). %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- set_peercert_infos(NoSSL, ClientInfo) when NoSSL =:= nossl; NoSSL =:= undefined -> ClientInfo; set_peercert_infos(Peercert, ClientInfo) -> {DN, CN} = {esockd_peercert:subject(Peercert), esockd_peercert:common_name(Peercert)}, ClientInfo#{dn => DN, cn => CN}. make_timer(Name, Time, Msg, Channel = #channel{timers = Timers}) -> TRef = emqx_misc:start_timer(Time, Msg), Channel#channel{timers = Timers#{Name => TRef}}. update_life_timer(#channel{session = Session, timers = Timers} = Channel) -> LifeTime = emqx_lwm2m_session:info(lifetime, Session), _ = case maps:get(lifetime, Timers, undefined) of undefined -> ok; Ref -> erlang:cancel_timer(Ref) end, make_timer(lifetime, LifeTime, lifetime, Channel). check_location(Location, #channel{session = Session}) -> SLocation = emqx_lwm2m_session:info(location_path, Session), Location =:= SLocation. do_takeover(_DesireId, Msg, Channel) -> %% TODO completed the takeover, now only reset the message Reset = emqx_coap_message:reset(Msg), call_session(handle_out, Reset, Channel). do_connect(Req, Result, Channel, Iter) -> case emqx_misc:pipeline( [ fun check_lwm2m_version/2 , fun run_conn_hooks/2 , fun enrich_clientinfo/2 , fun set_log_meta/2 , fun auth_connect/2 ], Req, Channel) of {ok, _Input, #channel{session = Session, with_context = WithContext} = NChannel} -> case emqx_lwm2m_session:info(reg_info, Session) of undefined -> process_connect(ensure_connected(NChannel), Req, Result, Iter); _ -> NewResult = emqx_lwm2m_session:reregister(Req, WithContext, Session), iter(Iter, maps:merge(Result, NewResult), NChannel) end; {error, ReasonCode, NChannel} -> ErrMsg = io_lib:format("Login Failed: ~ts", [ReasonCode]), Payload = erlang:list_to_binary(lists:flatten(ErrMsg)), iter(Iter, reply({error, bad_request}, Payload, Req, Result), NChannel) end. check_lwm2m_version(#coap_message{options = Opts}, #channel{conninfo = ConnInfo} = Channel) -> Ver = gets([uri_query, <<"lwm2m">>], Opts), IsValid = case Ver of <<"1.0">> -> true; <<"1">> -> true; <<"1.1">> -> true; _ -> false end, if IsValid -> NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond) , proto_name => <<"LwM2M">> , proto_ver => Ver }, {ok, Channel#channel{conninfo = NConnInfo}}; true -> ?SLOG(error, #{ msg => "reject_REGISTRE_request" , reason => {unsupported_version, Ver} }), {error, "invalid lwm2m version", Channel} end. run_conn_hooks(Input, Channel = #channel{ctx = Ctx, conninfo = ConnInfo}) -> ConnProps = #{}, case run_hooks(Ctx, 'client.connect', [ConnInfo], ConnProps) of Error = {error, _Reason} -> Error; _NConnProps -> {ok, Input, Channel} end. enrich_clientinfo(#coap_message{options = Options} = Msg, Channel = #channel{clientinfo = ClientInfo0}) -> Query = maps:get(uri_query, Options, #{}), case Query of #{<<"ep">> := Epn, <<"lt">> := Lifetime} -> Username = maps:get(<<"imei">>, Query, Epn), Password = maps:get(<<"password">>, Query, undefined), ClientId = maps:get(<<"device_id">>, Query, Epn), ClientInfo = ClientInfo0#{endpoint_name => Epn, lifetime => binary_to_integer(Lifetime), username => Username, password => Password, clientid => ClientId}, {ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo), {ok, Channel#channel{clientinfo = NClientInfo}}; _ -> ?SLOG(error, #{ msg => "reject_REGISTER_request" , reason => {wrong_paramters, Query} }), {error, "invalid queries", Channel} end. set_log_meta(_Input, #channel{clientinfo = #{clientid := ClientId}}) -> emqx_logger:set_metadata_clientid(ClientId), ok. auth_connect(_Input, Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) -> #{clientid := ClientId, username := Username} = ClientInfo, case emqx_gateway_ctx:authenticate(Ctx, ClientInfo) of {ok, NClientInfo} -> {ok, Channel#channel{clientinfo = NClientInfo, with_context = with_context(Ctx, ClientInfo)}}; {error, Reason} -> ?SLOG(warning, #{ msg => "client_login_failed" , clientid => ClientId , username => Username , reason => Reason }), {error, Reason} end. fix_mountpoint(_Packet, #{mountpoint := undefined} = ClientInfo) -> {ok, ClientInfo}; fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) -> Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), {ok, ClientInfo#{mountpoint := Mountpoint1}}. ensure_connected(Channel = #channel{ctx = Ctx, conninfo = ConnInfo, clientinfo = ClientInfo}) -> _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]), ok = run_hooks(Ctx, 'client.connected', [ClientInfo, ConnInfo]), Channel. process_connect(Channel = #channel{ctx = Ctx, session = Session, conninfo = ConnInfo, clientinfo = ClientInfo, with_context = WithContext}, Msg, Result, Iter) -> %% inherit the old session SessFun = fun(_,_) -> #{} end, case emqx_gateway_ctx:open_session( Ctx, true, ClientInfo, ConnInfo, SessFun, emqx_lwm2m_session ) of {ok, _} -> Mountpoint = maps:get(mountpoint, ClientInfo, <<>>), NewResult0 = emqx_lwm2m_session:init( Msg, Mountpoint, WithContext, Session ), NewResult1 = NewResult0#{events => [{event, connected}]}, iter(Iter, maps:merge(Result, NewResult1), Channel); {error, Reason} -> ?SLOG(error, #{ msg => "falied_to_open_session" , reason => Reason }), iter(Iter, reply({error, bad_request}, Msg, Result), Channel) end. run_hooks(Ctx, Name, Args) -> emqx_gateway_ctx:metrics_inc(Ctx, Name), emqx_hooks:run(Name, Args). run_hooks(Ctx, Name, Args, Acc) -> emqx_gateway_ctx:metrics_inc(Ctx, Name), emqx_hooks:run_fold(Name, Args, Acc). gets(_, undefined) -> undefined; gets([H | T], Map) -> gets(T, maps:get(H, Map, undefined)); gets([], Val) -> Val. with_context(publish, [Topic, Msg], Ctx, ClientInfo) -> case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of allow -> emqx:publish(Msg); _ -> ?SLOG(error, #{ msg => "publish_denied" , topic => Topic }) end; with_context(subscribe, [Topic, Opts], Ctx, #{username := Username} = ClientInfo) -> case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of allow -> run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, Opts]), ?SLOG(debug, #{ msg => "subscribe_topic_succeed" , topic => Topic , endpoint_name => Username }), emqx:subscribe(Topic, Username, Opts); _ -> ?SLOG(error, #{ msg => "subscribe_denied" , topic => Topic }) end; with_context(metrics, Name, Ctx, _ClientInfo) -> emqx_gateway_ctx:metrics_inc(Ctx, Name). %%-------------------------------------------------------------------- %% Call Chain %%-------------------------------------------------------------------- call_session(Fun, Msg, #channel{session = Session, with_context = WithContext} = Channel) -> iter([ session, fun process_session/4 , proto, fun process_protocol/4 , return, fun process_return/4 , lifetime, fun process_lifetime/4 , reply, fun process_reply/4 , out, fun process_out/4 , fun process_nothing/3 ], emqx_lwm2m_session:Fun(Msg, WithContext, Session), Channel). process_session(Session, Result, Channel, Iter) -> iter(Iter, Result, Channel#channel{session = Session}). process_protocol({request, Msg}, Result, Channel, Iter) -> #coap_message{method = Method} = Msg, handle_request_protocol(Method, Msg, Result, Channel, Iter); process_protocol(Msg, Result, #channel{with_context = WithContext, session = Session} = Channel, Iter) -> ProtoResult = emqx_lwm2m_session:handle_protocol_in(Msg, WithContext, Session), iter(Iter, maps:merge(Result, ProtoResult), Channel). handle_request_protocol(post, #coap_message{options = Opts} = Msg, Result, Channel, Iter) -> case Opts of #{uri_path := [?REG_PREFIX]} -> do_connect(Msg, Result, Channel, Iter); #{uri_path := Location} -> do_update(Location, Msg, Result, Channel, Iter); _ -> iter(Iter, reply({error, not_found}, Msg, Result), Channel) end; handle_request_protocol(delete, #coap_message{options = Opts} = Msg, Result, Channel, Iter) -> case Opts of #{uri_path := Location} -> case check_location(Location, Channel) of true -> Reply = emqx_coap_message:piggyback({ok, deleted}, Msg), {shutdown, close, Reply, Channel}; _ -> iter(Iter, reply({error, not_found}, Msg, Result), Channel) end; _ -> iter(Iter, reply({error, bad_request}, Msg, Result), Channel) end. do_update(Location, Msg, Result, #channel{session = Session, with_context = WithContext} = Channel, Iter) -> case check_location(Location, Channel) of true -> NewResult = emqx_lwm2m_session:update(Msg, WithContext, Session), iter(Iter, maps:merge(Result, NewResult), Channel); _ -> iter(Iter, reply({error, not_found}, Msg, Result), Channel) end. process_return({Outs, Session}, Result, Channel, Iter) -> OldOuts = maps:get(out, Result, []), iter(Iter, Result#{out => Outs ++ OldOuts}, Channel#channel{session = Session}). process_out(Outs, Result, Channel, _) -> Outs2 = lists:reverse(Outs), Outs3 = case maps:get(reply, Result, undefined) of undefined -> Outs2; Reply -> [Reply | Outs2] end, Events = maps:get(events, Result, []), {ok, [{outgoing, Outs3}] ++ Events, Channel}. process_reply(Reply, Result, #channel{session = Session} = Channel, _) -> Session2 = emqx_lwm2m_session:set_reply(Reply, Session), Outs = maps:get(out, Result, []), Outs2 = lists:reverse(Outs), Events = maps:get(events, Result, []), {ok, [{outgoing, [Reply | Outs2]}] ++ Events, Channel#channel{session = Session2}}. process_lifetime(_, Result, Channel, Iter) -> iter(Iter, Result, update_life_timer(Channel)). process_nothing(_, _, Channel) -> {ok, Channel}.