diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index b3b568271..6fdadcc3b 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -61,6 +61,9 @@ gateway.coap { heartbeat = 30s notify_type = qos + + ## if true, you need to establish a connection before use + connection_required = false subscribe_qos = qos0 publish_qos = qos1 @@ -128,7 +131,7 @@ gateway.lwm2m { enable_stats = true ## When publishing or subscribing, prefix all topics with a mountpoint string. - mountpoint = "lwm2m" + mountpoint = "lwm2m/%u" xml_dir = "{{ platform_etc_dir }}/lwm2m_xml" @@ -143,27 +146,27 @@ gateway.lwm2m { translators { command { - topic = "dn/#" + topic = "/dn/#" qos = 0 } response { - topic = "up/resp" + topic = "/up/resp" qos = 0 } notify { - topic = "up/notify" + topic = "/up/notify" qos = 0 } register { - topic = "up/resp" + topic = "/up/resp" qos = 0 } update { - topic = "up/resp" + topic = "/up/resp" qos = 0 } } diff --git a/apps/emqx_gateway/src/coap/README.md b/apps/emqx_gateway/src/coap/README.md index 88f657537..54f0fde84 100644 --- a/apps/emqx_gateway/src/coap/README.md +++ b/apps/emqx_gateway/src/coap/README.md @@ -414,21 +414,30 @@ Server will return token **X** in payload 2. Update Connection ``` -coap-client -m put -e "" "coap://127.0.0.1/mqtt/connection?clientid=123&username=admin&password=public&token=X" +coap-client -m put -e "" "coap://127.0.0.1/mqtt/connection?clientid=123&token=X" ``` 3. Publish ``` -coap-client -m post -e "Hellow" "coap://127.0.0.1/ps/coap/test?clientid=123&username=admin&password=public" +coap-client -m post -e "Hellow" "obstoken" "coap://127.0.0.1/ps/coap/test?clientid=123&username=admin&password=public" ``` if you want to publish with auth, you must first establish a connection, and then post publish request on the same socket, so libcoap client can't simulation publish with a token +``` +coap-client -m post -e "Hellow" "coap://127.0.0.1/ps/coap/test?clientid=123&token=X" +``` + 4. Subscribe ``` coap-client -m get -s 60 -O 6,0x00 -o - -T "obstoken" "coap://127.0.0.1/ps/coap/test?clientid=123&username=admin&password=public" ``` +**Or** +``` +coap-client -m get -s 60 -O 6,0x00 -o - -T "obstoken" "coap://127.0.0.1/ps/coap/test?clientid=123&token=X" +``` 5. Close Connection ``` -coap-client -m delete -e "" "coap://127.0.0.1/mqtt/connection?clientid=123&username=admin&password=public&token=X" -``` \ No newline at end of file +coap-client -m delete -e "" "coap://127.0.0.1/mqtt/connection?clientid=123&token=X +``` + diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index 6e554b9ef..11aca8cc8 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -55,6 +55,8 @@ %% Timer timers :: #{atom() => disable | undefined | reference()}, + connection_required :: boolean(), + conn_state :: idle | connected, token :: binary() | undefined @@ -63,6 +65,8 @@ -type channel() :: #channel{}. -define(TOKEN_MAXIMUM, 4294967295). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). +-define(DEF_IDLE_TIME, timer:seconds(30)). +-define(GET_IDLE_TIME(Cfg), maps:get(idle_timeout, Cfg, ?DEF_IDLE_TIME)). -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]). %%-------------------------------------------------------------------- @@ -110,13 +114,14 @@ init(ConnInfo = #{peername := {PeerHost, _}, } ), - Heartbeat = emqx:get_config([gateway, coap, idle_timeout]), + Heartbeat = ?GET_IDLE_TIME(Config), #channel{ ctx = Ctx , conninfo = ConnInfo , clientinfo = ClientInfo , timers = #{} , session = emqx_coap_session:new() , keepalive = emqx_keepalive:init(Heartbeat) + , connection_required = maps:get(connection_required, Config, false) , conn_state = idle }. @@ -216,13 +221,12 @@ make_timer(Name, Time, Msg, Channel = #channel{timers = Timers}) -> ensure_keepalive_timer(Channel) -> ensure_keepalive_timer(fun ensure_timer/4, Channel). -ensure_keepalive_timer(Fun, Channel) -> - Heartbeat = emqx:get_config([gateway, coap, idle_timeout]), +ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) -> + Heartbeat = emqx_keepalive:info(interval, KeepAlive), Fun(keepalive, Heartbeat, keepalive, Channel). -check_auth_state(Msg, Channel) -> - Enable = emqx:get_config([gateway, coap, enable_stats]), - check_token(Enable, Msg, Channel). +check_auth_state(Msg, #channel{connection_required = Required} = Channel) -> + check_token(Required, Msg, Channel). check_token(true, Msg, diff --git a/apps/emqx_gateway/src/coap/emqx_coap_medium.erl b/apps/emqx_gateway/src/coap/emqx_coap_medium.erl index ae5763179..8dafc7bbb 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_medium.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_medium.erl @@ -29,13 +29,14 @@ , reply/2, reply/3, reply/4]). %%-type result() :: map() | empty. --define(DEFINE_DEF(Name), Name(Msg) -> Name(Msg, #{})). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- empty() -> #{}. -?DEFINE_DEF(reset). +reset(Msg) -> + reset(Msg, #{}). reset(Msg, Result) -> out(emqx_coap_message:reset(Msg), Result). @@ -49,7 +50,8 @@ out(Msg, #{out := Outs} = Result) -> out(Msg, Result) -> Result#{out => [Msg]}. -?DEFINE_DEF(proto_out). +proto_out(Proto) -> + proto_out(Proto, #{}). proto_out(Proto, Resut) -> Resut#{proto => Proto}. diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 45e338a36..c3e241389 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -83,6 +83,7 @@ fields(mqttsn_predefined) -> fields(coap_structs) -> [ {heartbeat, sc(duration(), undefined, <<"30s">>)} + , {connection_required, sc(boolean(), undefined, false)} , {notify_type, sc(union([non, con, qos]), undefined, qos)} , {subscribe_qos, sc(union([qos0, qos1, qos2, coap]), undefined, coap)} , {publish_qos, sc(union([qos0, qos1, qos2, coap]), undefined, coap)} diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index b67032313..d0647897b 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -24,8 +24,7 @@ -export([ info/1 , info/2 , stats/1 - , validator/2 - , validator/4 + , with_context/2 , do_takeover/3]). -export([ init/2 @@ -53,11 +52,10 @@ %% Timer timers :: #{atom() => disable | undefined | reference()}, - validator :: function() + with_context :: function() }). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). - -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]). %%-------------------------------------------------------------------- @@ -109,16 +107,13 @@ init(ConnInfo = #{peername := {PeerHost, _}, , clientinfo = ClientInfo , timers = #{} , session = emqx_lwm2m_session:new() - , validator = validator(Ctx, ClientInfo) + , with_context = with_context(Ctx, ClientInfo) }. -validator(_Type, _Topic, _Ctx, _ClientInfo) -> - allow. - %emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic). -validator(Ctx, ClientInfo) -> +with_context(Ctx, ClientInfo) -> fun(Type, Topic) -> - validator(Type, Topic, Ctx, ClientInfo) + with_context(Type, Topic, Ctx, ClientInfo) end. %%-------------------------------------------------------------------- @@ -137,7 +132,10 @@ handle_deliver(Delivers, Channel) -> %%-------------------------------------------------------------------- %% Handle timeout %%-------------------------------------------------------------------- -handle_timeout(_, lifetime, Channel) -> +handle_timeout(_, lifetime, #channel{ctx = Ctx, + clientinfo = ClientInfo, + conninfo = ConnInfo} = Channel) -> + ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, timeout, ConnInfo]), {shutdown, timeout, Channel}; handle_timeout(_, {transport, _} = Msg, Channel) -> @@ -166,6 +164,10 @@ handle_cast(Req, Channel) -> %%-------------------------------------------------------------------- %% Handle Info %%-------------------------------------------------------------------- +handle_info({subscribe, _AutoSubs}, Channel) -> + %% not need handle this message + {ok, Channel}; + handle_info(Info, Channel) -> ?LOG(error, "Unexpected info: ~p", [Info]), {ok, Channel}. @@ -173,8 +175,12 @@ handle_info(Info, Channel) -> %%-------------------------------------------------------------------- %% Terminate %%-------------------------------------------------------------------- -terminate(_Reason, #channel{session = Session}) -> - emqx_lwm2m_session:on_close(Session). +terminate(Reason, #channel{ctx = Ctx, + clientinfo = ClientInfo, + session = Session}) -> + MountedTopic = emqx_lwm2m_session:on_close(Session), + _ = run_hooks(Ctx, 'session.unsubscribe', [ClientInfo, MountedTopic, #{}]), + run_hooks(Ctx, 'session.terminated', [ClientInfo, Reason, Session]). %%-------------------------------------------------------------------- %% Internal functions @@ -220,12 +226,12 @@ do_connect(Req, Result, Channel, Iter) -> Req, Channel) of {ok, _Input, #channel{session = Session, - validator = Validator} = NChannel} -> + with_context = WithContext} = NChannel} -> case emqx_lwm2m_session:info(reg_info, Session) of undefined -> process_connect(ensure_connected(NChannel), Req, Result, Iter); _ -> - NewResult = emqx_lwm2m_session:reregister(Req, Validator, Session), + NewResult = emqx_lwm2m_session:reregister(Req, WithContext, Session), iter(Iter, maps:merge(Result, NewResult), NChannel) end; {error, ReasonCode, NChannel} -> @@ -251,7 +257,7 @@ check_lwm2m_version(#coap_message{options = Opts}, end, if IsValid -> NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond) - , proto_name => <<"lwm2m">> + , proto_name => <<"LwM2M">> , proto_ver => Ver }, {ok, Channel#channel{conninfo = NConnInfo}}; @@ -274,7 +280,7 @@ enrich_clientinfo(#coap_message{options = Options} = Msg, Query = maps:get(uri_query, Options, #{}), case Query of #{<<"ep">> := Epn} -> - UserName = maps:get(<<"imei">>, Query, undefined), + UserName = maps:get(<<"imei">>, Query, Epn), Password = maps:get(<<"password">>, Query, undefined), ClientId = maps:get(<<"device_id">>, Query, Epn), ClientInfo = @@ -298,7 +304,7 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx, case emqx_gateway_ctx:authenticate(Ctx, ClientInfo) of {ok, NClientInfo} -> {ok, Channel#channel{clientinfo = NClientInfo, - validator = validator(Ctx, ClientInfo)}}; + with_context = with_context(Ctx, ClientInfo)}}; {error, Reason} -> ?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p", [ClientId, Username, Reason]), @@ -308,14 +314,13 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx, fix_mountpoint(_Packet, #{mountpoint := undefined} = ClientInfo) -> {ok, ClientInfo}; fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) -> - %% TODO: Enrich the varibale replacement???? - %% i.e: ${ClientInfo.auth_result.productKey} Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), {ok, ClientInfo#{mountpoint := Mountpoint1}}. ensure_connected(Channel = #channel{ctx = Ctx, conninfo = ConnInfo, clientinfo = ClientInfo}) -> + _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]), ok = run_hooks(Ctx, 'client.connected', [ClientInfo, ConnInfo]), Channel. @@ -323,7 +328,7 @@ process_connect(Channel = #channel{ctx = Ctx, session = Session, conninfo = ConnInfo, clientinfo = ClientInfo, - validator = Validator}, + with_context = WithContext}, Msg, Result, Iter) -> %% inherit the old session SessFun = fun(_,_) -> #{} end, @@ -336,7 +341,8 @@ process_connect(Channel = #channel{ctx = Ctx, emqx_lwm2m_session ) of {ok, _} -> - NewResult = emqx_lwm2m_session:init(Msg, Validator, Session), + Mountpoint = maps:get(mountpoint, ClientInfo, <<>>), + NewResult = emqx_lwm2m_session:init(Msg, Mountpoint, WithContext, Session), iter(Iter, maps:merge(Result, NewResult), Channel); {error, Reason} -> ?LOG(error, "Failed to open session du to ~p", [Reason]), @@ -358,13 +364,34 @@ gets([H | T], Map) -> gets([], Val) -> Val. +with_context(publish, [Topic, Msg], Ctx, ClientInfo) -> + case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of + allow -> + emqx:publish(Msg); + _ -> + ?LOG(error, "topic:~p not allow to publish ", [Topic]) + end; + +with_context(subscribe, [Topic, Opts], Ctx, #{username := UserName} = ClientInfo) -> + case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of + allow -> + run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, UserName]), + ?LOG(debug, "Subscribe topic: ~0p, Opts: ~0p, EndpointName: ~0p", [Topic, Opts, UserName]), + emqx:subscribe(Topic, UserName, Opts); + _ -> + ?LOG(error, "Topic: ~0p not allow to subscribe", [Topic]) + end; + +with_context(metrics, Name, Ctx, _ClientInfo) -> + emqx_gateway_ctx:metrics_inc(Ctx, Name). + %%-------------------------------------------------------------------- %% Call Chain %%-------------------------------------------------------------------- call_session(Fun, Msg, #channel{session = Session, - validator = Validator} = Channel) -> + with_context = WithContext} = Channel) -> iter([ session, fun process_session/4 , proto, fun process_protocol/4 , return, fun process_return/4 @@ -373,7 +400,7 @@ call_session(Fun, , out, fun process_out/4 , fun process_nothing/3 ], - emqx_lwm2m_session:Fun(Msg, Validator, Session), + emqx_lwm2m_session:Fun(Msg, WithContext, Session), Channel). process_session(Session, Result, Channel, Iter) -> @@ -384,8 +411,8 @@ process_protocol({request, Msg}, Result, Channel, Iter) -> handle_request_protocol(Method, Msg, Result, Channel, Iter); process_protocol(Msg, Result, - #channel{validator = Validator, session = Session} = Channel, Iter) -> - ProtoResult = emqx_lwm2m_session:handle_protocol_in(Msg, Validator, Session), + #channel{with_context = WithContext, session = Session} = Channel, Iter) -> + ProtoResult = emqx_lwm2m_session:handle_protocol_in(Msg, WithContext, Session), iter(Iter, maps:merge(Result, ProtoResult), Channel). handle_request_protocol(post, #coap_message{options = Opts} = Msg, @@ -415,10 +442,10 @@ handle_request_protocol(delete, #coap_message{options = Opts} = Msg, end. do_update(Location, Msg, Result, - #channel{session = Session, validator = Validator} = Channel, Iter) -> + #channel{session = Session, with_context = WithContext} = Channel, Iter) -> case check_location(Location, Channel) of true -> - NewResult = emqx_lwm2m_session:update(Msg, Validator, Session), + NewResult = emqx_lwm2m_session:update(Msg, WithContext, Session), iter(Iter, maps:merge(Result, NewResult), Channel); _ -> iter(Iter, reply({error, not_found}, Msg, Result), Channel) @@ -438,13 +465,8 @@ process_out(Outs, Result, Channel, _) -> Reply -> [Reply | Outs2] end, - %% emqx_gateway_conn bug, work around - case Outs3 of - [] -> - {ok, Channel}; - _ -> - {ok, {outgoing, Outs3}, Channel} - end. + + {ok, {outgoing, Outs3}, Channel}. process_reply(Reply, Result, #channel{session = Session} = Channel, _) -> Session2 = emqx_lwm2m_session:set_reply(Reply, Session), diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl index 925ca1d94..7c0cc95cd 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl @@ -30,20 +30,20 @@ -define(STANDARD, 1). -%-type msg_type() :: <<"create">> -% | <<"delete">> -% | <<"read">> -% | <<"write">> -% | <<"execute">> -% | <<"discover">> -% | <<"write-attr">> -% | <<"observe">> -% | <<"cancel-observe">>. -% - %-type cmd() :: #{ <<"msgType">> := msg_type() - % , <<"data">> := maps() - % %% more keys? - % }. +%%-type msg_type() :: <<"create">> +%% | <<"delete">> +%% | <<"read">> +%% | <<"write">> +%% | <<"execute">> +%% | <<"discover">> +%% | <<"write-attr">> +%% | <<"observe">> +%% | <<"cancel-observe">>. +%% +%%-type cmd() :: #{ <<"msgType">> := msg_type() +%% , <<"data">> := maps() +%% %%%% more keys? +%% }. %%-------------------------------------------------------------------- %% APIs diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl index 700302bdc..a1d03e04f 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl @@ -22,7 +22,7 @@ -include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl"). %% API --export([new/0, init/3, update/3, reregister/3, on_close/1]). +-export([new/0, init/4, update/3, reregister/3, on_close/1]). -export([ info/1 , info/2 @@ -47,9 +47,10 @@ , wait_ack :: request_context() | undefined , endpoint_name :: binary() | undefined , location_path :: list(binary()) | undefined - , headers :: map() | undefined , reg_info :: map() | undefined , lifetime :: non_neg_integer() | undefined + , is_cache_mode :: boolean() + , mountpoint :: binary() , last_active_at :: non_neg_integer() }). @@ -61,7 +62,7 @@ <<"7">>, <<"9">>, <<"15">>]). %% uplink and downlink topic configuration --define(lwm2m_up_dm_topic, {<<"v1/up/dm">>, 0}). +-define(lwm2m_up_dm_topic, {<<"/v1/up/dm">>, 0}). %% steal from emqx_session -define(INFO_KEYS, [subscriptions, @@ -95,41 +96,44 @@ new() -> #session{ coap = emqx_coap_tm:new() , queue = queue:new() , last_active_at = ?NOW + , is_cache_mode = false + , mountpoint = <<>> , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}. --spec init(emqx_coap_message(), function(), session()) -> map(). -init(#coap_message{options = Opts, payload = Payload} = Msg, Validator, Session) -> +-spec init(emqx_coap_message(), binary(), function(), session()) -> map(). +init(#coap_message{options = Opts, + payload = Payload} = Msg, MountPoint, WithContext, Session) -> Query = maps:get(uri_query, Opts), RegInfo = append_object_list(Query, Payload), - Headers = get_headers(RegInfo), LifeTime = get_lifetime(RegInfo), Epn = maps:get(<<"ep">>, Query), Location = [?PREFIX, Epn], - Result = return(register_init(Validator, - Session#session{headers = Headers, - endpoint_name = Epn, - location_path = Location, - reg_info = RegInfo, - lifetime = LifeTime, - queue = queue:new()})), + NewSession = Session#session{endpoint_name = Epn, + location_path = Location, + reg_info = RegInfo, + lifetime = LifeTime, + mountpoint = MountPoint, + is_cache_mode = is_psm(RegInfo) orelse is_qmode(RegInfo), + queue = queue:new()}, + Result = return(register_init(WithContext, NewSession)), Reply = emqx_coap_message:piggyback({ok, created}, Msg), Reply2 = emqx_coap_message:set(location_path, Location, Reply), reply(Reply2, Result#{lifetime => true}). -reregister(Msg, Validator, Session) -> - update(Msg, Validator, <<"register">>, Session). +reregister(Msg, WithContext, Session) -> + update(Msg, WithContext, <<"register">>, Session). -update(Msg, Validator, Session) -> - update(Msg, Validator, <<"update">>, Session). +update(Msg, WithContext, Session) -> + update(Msg, WithContext, <<"update">>, Session). --spec on_close(session()) -> ok. -on_close(#session{endpoint_name = Epn}) -> +-spec on_close(session()) -> binary(). +on_close(Session) -> #{topic := Topic} = downlink_topic(), - MountedTopic = mount(Topic, mountpoint(Epn)), + MountedTopic = mount(Topic, Session), emqx:unsubscribe(MountedTopic), - ok. + MountedTopic. %%-------------------------------------------------------------------- %% Info, Stats @@ -194,15 +198,15 @@ stats(Session) -> info(?STATS_KEYS, Session). %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- -handle_coap_in(Msg, _Validator, Session) -> +handle_coap_in(Msg, _WithContext, Session) -> call_coap(case emqx_coap_message:is_request(Msg) of true -> handle_request; _ -> handle_response end, Msg, Session#session{last_active_at = ?NOW}). -handle_deliver(Delivers, _Validator, Session) -> - return(deliver(Delivers, Session)). +handle_deliver(Delivers, WithContext, Session) -> + return(deliver(Delivers, WithContext, Session)). timeout({transport, Msg}, _, Session) -> call_coap(timeout, Msg, Session). @@ -214,17 +218,17 @@ set_reply(Msg, #session{coap = Coap} = Session) -> %%-------------------------------------------------------------------- %% Protocol Stack %%-------------------------------------------------------------------- -handle_protocol_in({response, CtxMsg}, Validator, Session) -> - return(handle_coap_response(CtxMsg, Validator, Session)); +handle_protocol_in({response, CtxMsg}, WithContext, Session) -> + return(handle_coap_response(CtxMsg, WithContext, Session)); -handle_protocol_in({ack, CtxMsg}, Validator, Session) -> - return(handle_ack(CtxMsg, Validator, Session)); +handle_protocol_in({ack, CtxMsg}, WithContext, Session) -> + return(handle_ack(CtxMsg, WithContext, Session)); -handle_protocol_in({ack_failure, CtxMsg}, Validator, Session) -> - return(handle_ack_failure(CtxMsg, Validator, Session)); +handle_protocol_in({ack_failure, CtxMsg}, WithContext, Session) -> + return(handle_ack_failure(CtxMsg, WithContext, Session)); -handle_protocol_in({reset, CtxMsg}, Validator, Session) -> - return(handle_ack_reset(CtxMsg, Validator, Session)). +handle_protocol_in({reset, CtxMsg}, WithContext, Session) -> + return(handle_ack_reset(CtxMsg, WithContext, Session)). %%-------------------------------------------------------------------- %% Register @@ -302,50 +306,6 @@ delink(Str) -> Ltrim = binary_util:ltrim(Str, $<), binary_util:rtrim(Ltrim, $>). -get_headers(RegInfo) -> - lists:foldl(fun(K, Acc) -> - get_header(K, RegInfo, Acc) - end, - extract_module_params(RegInfo), - [<<"apn">>, <<"im">>, <<"ct">>, <<"mv">>, <<"mt">>]). - -get_header(Key, RegInfo, Headers) -> - case maps:get(Key, RegInfo, undefined) of - undefined -> - Headers; - Val -> - AtomKey = erlang:binary_to_atom(Key), - Headers#{AtomKey => Val} - end. - -extract_module_params(RegInfo) -> - Keys = [<<"module">>, <<"sv">>, <<"chip">>, <<"imsi">>, <<"iccid">>], - case lists:any(fun(K) -> maps:get(K, RegInfo, undefined) =:= undefined end, Keys) of - true -> #{module_params => undefined}; - false -> - Extras = [<<"rsrp">>, <<"sinr">>, <<"txpower">>, <<"cellid">>], - case lists:any(fun(K) -> maps:get(K, RegInfo, undefined) =:= undefined end, Extras) of - true -> - #{module_params => - #{module => maps:get(<<"module">>, RegInfo), - softversion => maps:get(<<"sv">>, RegInfo), - chiptype => maps:get(<<"chip">>, RegInfo), - imsi => maps:get(<<"imsi">>, RegInfo), - iccid => maps:get(<<"iccid">>, RegInfo)}}; - false -> - #{module_params => - #{module => maps:get(<<"module">>, RegInfo), - softversion => maps:get(<<"sv">>, RegInfo), - chiptype => maps:get(<<"chip">>, RegInfo), - imsi => maps:get(<<"imsi">>, RegInfo), - iccid => maps:get(<<"iccid">>, RegInfo), - rsrp => maps:get(<<"rsrp">>, RegInfo), - sinr => maps:get(<<"sinr">>, RegInfo), - txpower => maps:get(<<"txpower">>, RegInfo), - cellid => maps:get(<<"cellid">>, RegInfo)}} - end - end. - get_lifetime(#{<<"lt">> := LT}) -> case LT of 0 -> emqx:get_config([gateway, lwm2m, lifetime_max]); @@ -362,7 +322,7 @@ get_lifetime(_, OldRegInfo) -> -spec update(emqx_coap_message(), function(), binary(), session()) -> map(). update(#coap_message{options = Opts, payload = Payload} = Msg, - Validator, + WithContext, CmdType, #session{reg_info = OldRegInfo} = Session) -> Query = maps:get(uri_query, Opts), @@ -370,58 +330,51 @@ update(#coap_message{options = Opts, payload = Payload} = Msg, UpdateRegInfo = maps:merge(OldRegInfo, RegInfo), LifeTime = get_lifetime(UpdateRegInfo, OldRegInfo), - Session2 = proto_subscribe(Validator, - Session#session{reg_info = UpdateRegInfo, - lifetime = LifeTime}), + NewSession = Session#session{reg_info = UpdateRegInfo, + is_cache_mode = + is_psm(UpdateRegInfo) orelse is_qmode(UpdateRegInfo), + lifetime = LifeTime}, + + Session2 = proto_subscribe(WithContext, NewSession), Session3 = send_dl_msg(Session2), RegPayload = #{<<"data">> => UpdateRegInfo}, - Session4 = send_to_mqtt(#{}, CmdType, RegPayload, Validator, Session3), + Session4 = send_to_mqtt(#{}, CmdType, RegPayload, WithContext, Session3), Result = return(Session4), Reply = emqx_coap_message:piggyback({ok, changed}, Msg), reply(Reply, Result#{lifetime => true}). -register_init(Validator, #session{reg_info = RegInfo, - endpoint_name = Epn} = Session) -> - +register_init(WithContext, #session{reg_info = RegInfo} = Session) -> Session2 = send_auto_observe(RegInfo, Session), %% - subscribe to the downlink_topic and wait for commands #{topic := Topic, qos := Qos} = downlink_topic(), - MountedTopic = mount(Topic, mountpoint(Epn)), - Session3 = subscribe(MountedTopic, Qos, Validator, Session2), + MountedTopic = mount(Topic, Session), + Session3 = subscribe(MountedTopic, Qos, WithContext, Session2), Session4 = send_dl_msg(Session3), %% - report the registration info RegPayload = #{<<"data">> => RegInfo}, - send_to_mqtt(#{}, <<"register">>, RegPayload, Validator, Session4). + send_to_mqtt(#{}, <<"register">>, RegPayload, WithContext, Session4). %%-------------------------------------------------------------------- %% Subscribe %%-------------------------------------------------------------------- -proto_subscribe(Validator, #session{endpoint_name = Epn, wait_ack = WaitAck} = Session) -> +proto_subscribe(WithContext, #session{wait_ack = WaitAck} = Session) -> #{topic := Topic, qos := Qos} = downlink_topic(), - MountedTopic = mount(Topic, mountpoint(Epn)), + MountedTopic = mount(Topic, Session), Session2 = case WaitAck of undefined -> Session; Ctx -> MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(Ctx, <<"coap_timeout">>), - send_to_mqtt(Ctx, <<"coap_timeout">>, MqttPayload, Validator, Session) + send_to_mqtt(Ctx, <<"coap_timeout">>, MqttPayload, WithContext, Session) end, - subscribe(MountedTopic, Qos, Validator, Session2). + subscribe(MountedTopic, Qos, WithContext, Session2). -subscribe(Topic, Qos, Validator, - #session{headers = Headers, endpoint_name = EndpointName} = Session) -> - case Validator(subscribe, Topic) of - allow -> - ClientId = maps:get(device_id, Headers, undefined), - Opts = get_sub_opts(Qos), - ?LOG(debug, "Subscribe topic: ~0p, Opts: ~0p, EndpointName: ~0p", [Topic, Opts, EndpointName]), - emqx:subscribe(Topic, ClientId, Opts); - _ -> - ?LOG(error, "Topic: ~0p not allow to subscribe", [Topic]) - end, +subscribe(Topic, Qos, WithContext, Session) -> + Opts = get_sub_opts(Qos), + WithContext(subscribe, [Topic, Opts]), Session. send_auto_observe(RegInfo, Session) -> @@ -486,7 +439,7 @@ handle_coap_response({Ctx = #{<<"msgType">> := EventType}, type = CoapMsgType, payload = CoapMsgPayload, options = CoapMsgOpts}}, - Validator, + WithContext, Session) -> MqttPayload = emqx_lwm2m_cmd:coap_to_mqtt(CoapMsgMethod, CoapMsgPayload, CoapMsgOpts, Ctx), {ReqPath, _} = emqx_lwm2m_cmd:path_list(emqx_lwm2m_cmd:extract_path(Ctx)), @@ -495,46 +448,43 @@ handle_coap_response({Ctx = #{<<"msgType">> := EventType}, {[<<"5">>| _], _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack -> %% this is a notification for status update during NB firmware upgrade. %% need to reply to DM http callbacks - send_to_mqtt(Ctx, <<"notify">>, MqttPayload, ?lwm2m_up_dm_topic, Validator, Session); + send_to_mqtt(Ctx, <<"notify">>, MqttPayload, ?lwm2m_up_dm_topic, WithContext, Session); {_ReqPath, _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack -> %% this is actually a notification, correct the msgType - send_to_mqtt(Ctx, <<"notify">>, MqttPayload, Validator, Session); + send_to_mqtt(Ctx, <<"notify">>, MqttPayload, WithContext, Session); _ -> - send_to_mqtt(Ctx, EventType, MqttPayload, Validator, Session) + send_to_mqtt(Ctx, EventType, MqttPayload, WithContext, Session) end, send_dl_msg(Ctx, Session2). %%-------------------------------------------------------------------- %% Ack %%-------------------------------------------------------------------- -handle_ack({Ctx, _}, Validator, Session) -> +handle_ack({Ctx, _}, WithContext, Session) -> Session2 = send_dl_msg(Ctx, Session), MqttPayload = emqx_lwm2m_cmd:empty_ack_to_mqtt(Ctx), - send_to_mqtt(Ctx, <<"ack">>, MqttPayload, Validator, Session2). + send_to_mqtt(Ctx, <<"ack">>, MqttPayload, WithContext, Session2). %%-------------------------------------------------------------------- %% Ack Failure(Timeout/Reset) %%-------------------------------------------------------------------- -handle_ack_failure({Ctx, _}, Validator, Session) -> - handle_ack_failure(Ctx, <<"coap_timeout">>, Validator, Session). +handle_ack_failure({Ctx, _}, WithContext, Session) -> + handle_ack_failure(Ctx, <<"coap_timeout">>, WithContext, Session). -handle_ack_reset({Ctx, _}, Validator, Session) -> - handle_ack_failure(Ctx, <<"coap_reset">>, Validator, Session). +handle_ack_reset({Ctx, _}, WithContext, Session) -> + handle_ack_failure(Ctx, <<"coap_reset">>, WithContext, Session). -handle_ack_failure(Ctx, MsgType, Validator, Session) -> +handle_ack_failure(Ctx, MsgType, WithContext, Session) -> Session2 = may_send_dl_msg(coap_timeout, Ctx, Session), MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(Ctx, MsgType), - send_to_mqtt(Ctx, MsgType, MqttPayload, Validator, Session2). + send_to_mqtt(Ctx, MsgType, MqttPayload, WithContext, Session2). %%-------------------------------------------------------------------- %% Send To CoAP %%-------------------------------------------------------------------- -may_send_dl_msg(coap_timeout, Ctx, #session{headers = Headers, - reg_info = RegInfo, - wait_ack = WaitAck} = Session) -> - Lwm2mMode = maps:get(lwm2m_model, Headers, undefined), - case is_cache_mode(Lwm2mMode, RegInfo, Session) of +may_send_dl_msg(coap_timeout, Ctx, #session{wait_ack = WaitAck} = Session) -> + case is_cache_mode(Session) of false -> send_dl_msg(Ctx, Session); true -> case WaitAck of @@ -545,14 +495,11 @@ may_send_dl_msg(coap_timeout, Ctx, #session{headers = Headers, end end. -is_cache_mode(Lwm2mMode, RegInfo, #session{last_active_at = LastActiveAt}) -> - case Lwm2mMode =:= psm orelse is_psm(RegInfo) orelse is_qmode(RegInfo) of - true -> - QModeTimeWind = emqx:get_config([gateway, lwm2m, qmode_time_window]), - Now = ?NOW, - (Now - LastActiveAt) >= QModeTimeWind; - false -> false - end. +is_cache_mode(#session{is_cache_mode = IsCacheMode, + last_active_at = LastActiveAt}) -> + IsCacheMode andalso + ((?NOW - LastActiveAt) >= + emqx:get_config([gateway, lwm2m, qmode_time_window])). is_psm(#{<<"apn">> := APN}) when APN =:= <<"Ctnb">>; APN =:= <<"psmA.eDRX0.ctnb">>; @@ -611,54 +558,27 @@ send_msg_not_waiting_ack(Ctx, Req, Session) -> %%-------------------------------------------------------------------- %% Send To MQTT %%-------------------------------------------------------------------- -send_to_mqtt(Ref, EventType, Payload, Validator, Session = #session{headers = Headers}) -> +send_to_mqtt(Ref, EventType, Payload, WithContext, Session) -> #{topic := Topic, qos := Qos} = uplink_topic(EventType), - NHeaders = extract_ext_flags(Headers), Mheaders = maps:get(mheaders, Ref, #{}), - NHeaders1 = maps:merge(NHeaders, Mheaders), - proto_publish(Topic, Payload#{<<"msgType">> => EventType}, Qos, NHeaders1, Validator, Session). + proto_publish(Topic, Payload#{<<"msgType">> => EventType}, Qos, Mheaders, WithContext, Session). send_to_mqtt(Ctx, EventType, Payload, {Topic, Qos}, - Validator, #session{headers = Headers} = Session) -> + WithContext, Session) -> Mheaders = maps:get(mheaders, Ctx, #{}), - NHeaders = extract_ext_flags(Headers), - NHeaders1 = maps:merge(NHeaders, Mheaders), - proto_publish(Topic, Payload#{<<"msgType">> => EventType}, Qos, NHeaders1, Validator, Session). + proto_publish(Topic, Payload#{<<"msgType">> => EventType}, Qos, Mheaders, WithContext, Session). -proto_publish(Topic, Payload, Qos, Headers, Validator, +proto_publish(Topic, Payload, Qos, Headers, WithContext, #session{endpoint_name = Epn} = Session) -> - MountedTopic = mount(Topic, mountpoint(Epn)), - _ = case Validator(publish, MountedTopic) of - allow -> - Msg = emqx_message:make(Epn, Qos, MountedTopic, - emqx_json:encode(Payload), #{}, Headers), - emqx:publish(Msg); - _ -> - ?LOG(error, "topic:~p not allow to publish ", [MountedTopic]) - end, + MountedTopic = mount(Topic, Session), + Msg = emqx_message:make(Epn, Qos, MountedTopic, + emqx_json:encode(Payload), #{}, Headers), + WithContext(publish, [MountedTopic, Msg]), Session. -mountpoint(Epn) -> - Prefix = emqx:get_config([gateway, lwm2m, mountpoint]), - <>. - -mount(Topic, MountPoint) when is_binary(Topic), is_binary(MountPoint) -> +mount(Topic, #session{mountpoint = MountPoint}) when is_binary(Topic) -> <>. -extract_ext_flags(Headers) -> - Header0 = #{is_tr => maps:get(is_tr, Headers, true)}, - check(Header0, Headers, [sota_type, appId, nbgwFlag]). - -check(Params, _Headers, []) -> Params; -check(Params, Headers, [Key | Rest]) -> - case maps:get(Key, Headers, null) of - V when V == undefined; V == null -> - check(Params, Headers, Rest); - Value -> - Params1 = Params#{Key => Value}, - check(Params1, Headers, Rest) - end. - downlink_topic() -> emqx:get_config([gateway, lwm2m, translators, command]). @@ -678,29 +598,30 @@ uplink_topic(_) -> %% Deliver %%-------------------------------------------------------------------- -deliver(Delivers, #session{headers = Headers, reg_info = RegInfo} = Session) -> - Lwm2mMode = maps:get(lwm2m_model, Headers, undefined), - IsCacheMode = is_cache_mode(Lwm2mMode, RegInfo, Session), +deliver(Delivers, WithContext, #session{reg_info = RegInfo} = Session) -> + IsCacheMode = is_cache_mode(Session), AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>), lists:foldl(fun({deliver, _, MQTT}, Acc) -> deliver_to_coap(AlternatePath, - MQTT#message.payload, MQTT, IsCacheMode, Acc) + MQTT#message.payload, MQTT, IsCacheMode, WithContext, Acc) end, Session, Delivers). -deliver_to_coap(AlternatePath, JsonData, MQTT, CacheMode, Session) when is_binary(JsonData)-> +deliver_to_coap(AlternatePath, JsonData, MQTT, CacheMode, WithContext, Session) when is_binary(JsonData)-> try TermData = emqx_json:decode(JsonData, [return_maps]), - deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, Session) + deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, WithContext, Session) catch ExClass:Error:ST -> ?LOG(error, "deliver_to_coap - Invalid JSON: ~0p, Exception: ~0p, stacktrace: ~0p", [JsonData, {ExClass, Error}, ST]), + WithContext(metrics, 'delivery.dropped'), Session end; -deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, Session) when is_map(TermData) -> +deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, WithContext, Session) when is_map(TermData) -> + WithContext(metrics, 'messages.delivered'), {Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData), ExpiryTime = get_expiry_time(MQTT), maybe_do_deliver_to_coap(Ctx, Req, ExpiryTime, CacheMode, Session). diff --git a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl index 5df13f8d6..28edda7ef 100644 --- a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl @@ -35,14 +35,14 @@ gateway.lwm2m { lifetime_max = 86400s qmode_time_windonw = 22 auto_observe = false - mountpoint = \"lwm2m\" + mountpoint = \"lwm2m/%u\" update_msg_publish_condition = contains_object_list translators { - command = {topic = \"dn/#\", qos = 0} - response = {topic = \"up/resp\", qos = 0} - notify = {topic = \"up/notify\", qos = 0} - register = {topic = \"up/resp\", qos = 0} - update = {topic = \"up/resp\", qos = 0} + command = {topic = \"/dn/#\", qos = 0} + response = {topic = \"/up/resp\", qos = 0} + notify = {topic = \"/up/notify\", qos = 0} + register = {topic = \"/up/resp\", qos = 0} + update = {topic = \"/up/resp\", qos = 0} } listeners.udp.default { bind = 5783 @@ -66,7 +66,7 @@ all() -> , {group, test_grp_6_observe} %% {group, test_grp_8_object_19} - %% {group, test_grp_9_psm_queue_mode} + , {group, test_grp_9_psm_queue_mode} ]. suite() -> [{timetrap, {seconds, 90}}]. @@ -1750,7 +1750,7 @@ server_cache_mode(Config, RegOption) -> verify_read_response_1(0, UdpSock), %% server inters into PSM mode - timer:sleep(2), + timer:sleep(2500), %% verify server caches downlink commands send_read_command_1(1, UdpSock), @@ -1797,6 +1797,7 @@ verify_read_response_1(CmdId, UdpSock) -> ReadResult = emqx_json:encode(#{ <<"requestID">> => CmdId, <<"cacheID">> => CmdId, <<"msgType">> => <<"read">>, <<"data">> => #{ + <<"reqPath">> => <<"/3/0/0">>, <<"code">> => <<"2.05">>, <<"codeMsg">> => <<"content">>, <<"content">> => [#{