%%-------------------------------------------------------------------- %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_coap_pubsub_resource). -behaviour(coap_resource). -include("emqx_coap.hrl"). -include_lib("gen_coap/include/coap.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). -logger_header("[CoAP-PS-RES]"). -export([ coap_discover/2 , coap_get/5 , coap_post/4 , coap_put/4 , coap_delete/3 , coap_observe/5 , coap_unobserve/1 , handle_info/2 , coap_ack/2 ]). -ifdef(TEST). -export([topic/1]). -endif. -define(PS_PREFIX, [<<"ps">>]). %%-------------------------------------------------------------------- %% Resource Callbacks %%-------------------------------------------------------------------- coap_discover(_Prefix, _Args) -> [{absolute, [<<"ps">>], []}]. coap_get(ChId, ?PS_PREFIX, TopicPath, Query, Content=#coap_content{format = Format}) when TopicPath =/= [] -> Topic = topic(TopicPath), ?LOG(debug, "coap_get() Topic=~p, Query=~p~n", [Topic, Query]), #coap_mqtt_auth{clientid = Clientid, username = Usr, password = Passwd} = get_auth(Query), case emqx_coap_mqtt_adapter:client_pid(Clientid, Usr, Passwd, ChId) of {ok, Pid} -> put(mqtt_client_pid, Pid), case Format of <<"application/link-format">> -> Content; _Other -> %% READ the topic info read_last_publish_message(emqx_topic:wildcard(Topic), Topic, Content) end; {error, auth_failure} -> put(mqtt_client_pid, undefined), {error, unauthorized}; {error, bad_request} -> put(mqtt_client_pid, undefined), {error, bad_request}; {error, _Other} -> put(mqtt_client_pid, undefined), {error, internal_server_error} end; coap_get(ChId, Prefix, TopicPath, Query, _Content) -> ?LOG(error, "ignore bad get request ChId=~p, Prefix=~p, TopicPath=~p, Query=~p", [ChId, Prefix, TopicPath, Query]), {error, bad_request}. coap_post(_ChId, ?PS_PREFIX, TopicPath, #coap_content{format = Format, payload = Payload, max_age = MaxAge}) when TopicPath =/= [] -> Topic = topic(TopicPath), ?LOG(debug, "coap_post() Topic=~p, MaxAge=~p, Format=~p~n", [Topic, MaxAge, Format]), case Format of %% We treat ct of "application/link-format" as CREATE message <<"application/link-format">> -> handle_received_create(Topic, MaxAge, Payload); %% We treat ct of other values as PUBLISH message Other -> ?LOG(debug, "coap_post() receive payload format=~p, will process as PUBLISH~n", [Format]), handle_received_publish(Topic, MaxAge, Other, Payload) end; coap_post(_ChId, _Prefix, _TopicPath, _Content) -> {error, method_not_allowed}. coap_put(_ChId, ?PS_PREFIX, TopicPath, #coap_content{max_age = MaxAge, format = Format, payload = Payload}) when TopicPath =/= [] -> Topic = topic(TopicPath), ?LOG(debug, "put message, Topic=~p, Payload=~p~n", [Topic, Payload]), handle_received_publish(Topic, MaxAge, Format, Payload); coap_put(_ChId, Prefix, TopicPath, Content) -> ?LOG(error, "put has error, Prefix=~p, TopicPath=~p, Content=~p", [Prefix, TopicPath, Content]), {error, bad_request}. coap_delete(_ChId, ?PS_PREFIX, TopicPath) -> delete_topic_info(topic(TopicPath)); coap_delete(_ChId, _Prefix, _TopicPath) -> {error, method_not_allowed}. coap_observe(ChId, ?PS_PREFIX, TopicPath, Ack, Content) when TopicPath =/= [] -> Topic = topic(TopicPath), ?LOG(debug, "observe Topic=~p, Ack=~p,Content=~p", [Topic, Ack, Content]), Pid = get(mqtt_client_pid), case emqx_coap_mqtt_adapter:subscribe(Pid, Topic) of ok -> Code = case emqx_coap_pubsub_topics:is_topic_timeout(Topic) of true -> nocontent; false-> content end, {ok, {state, ChId, ?PS_PREFIX, [Topic]}, Code, Content}; {error, Code} -> {error, Code} end; coap_observe(ChId, Prefix, TopicPath, Ack, _Content) -> ?LOG(error, "unknown observe request ChId=~p, Prefix=~p, TopicPath=~p, Ack=~p", [ChId, Prefix, TopicPath, Ack]), {error, bad_request}. coap_unobserve({state, _ChId, ?PS_PREFIX, TopicPath}) when TopicPath =/= [] -> Topic = topic(TopicPath), ?LOG(debug, "unobserve ~p", [Topic]), Pid = get(mqtt_client_pid), emqx_coap_mqtt_adapter:unsubscribe(Pid, Topic), ok; coap_unobserve({state, ChId, Prefix, TopicPath}) -> ?LOG(error, "ignore unknown unobserve request ChId=~p, Prefix=~p, TopicPath=~p", [ChId, Prefix, TopicPath]), ok. handle_info({dispatch, Topic, Payload}, State) -> ?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]), {ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload), ?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]), {notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State}; handle_info(Message, State) -> ?LOG(error, "Unknown Message ~p", [Message]), {noreply, State}. coap_ack(_Ref, State) -> {ok, State}. %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- get_auth(Query) -> get_auth(Query, #coap_mqtt_auth{}). get_auth([], Auth=#coap_mqtt_auth{}) -> Auth; get_auth([<<$c, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) -> get_auth(T, Auth#coap_mqtt_auth{clientid = Rest}); get_auth([<<$u, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) -> get_auth(T, Auth#coap_mqtt_auth{username = Rest}); get_auth([<<$p, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) -> get_auth(T, Auth#coap_mqtt_auth{password = Rest}); get_auth([Param|T], Auth=#coap_mqtt_auth{}) -> ?LOG(error, "ignore unknown parameter ~p", [Param]), get_auth(T, Auth). add_topic_info(publish, Topic, MaxAge, Format, Payload) when is_binary(Topic), Topic =/= <<>> -> case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of [{_, StoredMaxAge, StoredCT, _, _}] -> ?LOG(debug, "publish topic=~p already exists, need reset the topic info", [Topic]), %% check whether the ct value stored matches the ct option in this POST message case Format =:= StoredCT of true -> {ok, Ret} = case StoredMaxAge =:= MaxAge of true -> emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload); false -> emqx_coap_pubsub_topics:reset_topic_info(Topic, MaxAge, Payload) end, {changed, Ret}; false -> ?LOG(debug, "ct values of topic=~p do not match, stored ct=~p, new ct=~p, ignore the PUBLISH", [Topic, StoredCT, Format]), {changed, false} end; [] -> ?LOG(debug, "publish topic=~p will be created", [Topic]), {ok, Ret} = emqx_coap_pubsub_topics:add_topic_info(Topic, MaxAge, Format, Payload), {created, Ret} end; add_topic_info(create, Topic, MaxAge, Format, _Payload) when is_binary(Topic), Topic =/= <<>> -> case emqx_coap_pubsub_topics:is_topic_existed(Topic) of true -> %% Whether we should support CREATE to an existed topic is TBD!! ?LOG(debug, "create topic=~p already exists, need reset the topic info", [Topic]), {ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, MaxAge, Format, <<>>); false -> ?LOG(debug, "create topic=~p will be created", [Topic]), {ok, Ret} = emqx_coap_pubsub_topics:add_topic_info(Topic, MaxAge, Format, <<>>) end, {created, Ret}; add_topic_info(_, Topic, _MaxAge, _Format, _Payload) -> ?LOG(debug, "create topic=~p info failed", [Topic]), {badarg, false}. concatenate_location_path(List = [TopicPart1, TopicPart2, TopicPart3]) when is_binary(TopicPart1), is_binary(TopicPart2), is_binary(TopicPart3) -> list_to_binary(lists:foldl( fun (Element, AccIn) when Element =/= <<>> -> AccIn ++ "/" ++ binary_to_list(Element); (_Element, AccIn) -> AccIn end, [], List)). format_string_to_int(<<"application/octet-stream">>) -> <<"42">>; format_string_to_int(<<"application/exi">>) -> <<"47">>; format_string_to_int(<<"application/json">>) -> <<"50">>. handle_received_publish(Topic, MaxAge, Format, Payload) -> case add_topic_info(publish, Topic, MaxAge, format_string_to_int(Format), Payload) of {Ret, true} -> Pid = get(mqtt_client_pid), case emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload) of ok -> {ok, Ret, case Ret of changed -> #coap_content{}; created -> #coap_content{location_path = [ concatenate_location_path([<<"ps">>, Topic, <<>>])]} end}; {error, Code} -> {error, Code} end; {_, false} -> ?LOG(debug, "add_topic_info failed, will return bad_request", []), {error, bad_request} end. handle_received_create(TopicPrefix, MaxAge, Payload) -> case core_link:decode(Payload) of [{rootless, [Topic], [{ct, CT}]}] when is_binary(Topic), Topic =/= <<>> -> TrueTopic = emqx_http_lib:uri_decode(Topic), ?LOG(debug, "decoded link-format payload, the Topic=~p, CT=~p~n", [TrueTopic, CT]), LocPath = concatenate_location_path([<<"ps">>, TopicPrefix, TrueTopic]), FullTopic = binary:part(LocPath, 4, byte_size(LocPath)-4), ?LOG(debug, "the location path is ~p, the full topic is ~p~n", [LocPath, FullTopic]), case add_topic_info(create, FullTopic, MaxAge, CT, <<>>) of {_, true} -> ?LOG(debug, "create topic info successfully, will return LocPath=~p", [LocPath]), {ok, created, #coap_content{location_path = [LocPath]}}; {_, false} -> ?LOG(debug, "create topic info failed, will return bad_request", []), {error, bad_request} end; Other -> ?LOG(debug, "post with bad payload of link-format ~p, will return bad_request", [Other]), {error, bad_request} end. %% When topic is timeout, server should return nocontent here, %% but gen_coap only receive return value of #coap_content from coap_get, so temporarily we can't give the Code 2.07 {ok, nocontent} out.TBC!!! return_resource(Topic, Payload, MaxAge, TimeStamp, Content) -> TimeElapsed = trunc((erlang:system_time(millisecond) - TimeStamp) / 1000), case TimeElapsed < MaxAge of true -> LeftTime = (MaxAge - TimeElapsed), ?LOG(debug, "topic=~p has max age left time is ~p", [Topic, LeftTime]), Content#coap_content{max_age = LeftTime, payload = Payload}; false -> ?LOG(debug, "topic=~p has been timeout, will return empty content", [Topic]), #coap_content{} end. read_last_publish_message(false, Topic, Content=#coap_content{format = QueryFormat}) when is_binary(QueryFormat)-> ?LOG(debug, "the QueryFormat=~p", [QueryFormat]), case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of [] -> {error, not_found}; [{_, MaxAge, CT, Payload, TimeStamp}] -> case CT =:= format_string_to_int(QueryFormat) of true -> return_resource(Topic, Payload, MaxAge, TimeStamp, Content); false -> ?LOG(debug, "format value does not match, the queried format=~p, the stored format=~p", [QueryFormat, CT]), {error, bad_request} end end; read_last_publish_message(false, Topic, Content) -> case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of [] -> {error, not_found}; [{_, MaxAge, _, Payload, TimeStamp}] -> return_resource(Topic, Payload, MaxAge, TimeStamp, Content) end; read_last_publish_message(true, Topic, _Content) -> ?LOG(debug, "the topic=~p is illegal wildcard topic", [Topic]), {error, bad_request}. delete_topic_info(Topic) -> case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of [] -> {error, not_found}; [{_, _, _, _, _}] -> emqx_coap_pubsub_topics:delete_sub_topics(Topic) end. topic(Topic) when is_binary(Topic) -> Topic; topic([]) -> <<>>; topic([Path | TopicPath]) -> case topic(TopicPath) of <<>> -> Path; RemTopic -> <> end.