diff --git a/apps/emqx_coap/README.md b/apps/emqx_coap/README.md index 2da7b9fca..927666358 100644 --- a/apps/emqx_coap/README.md +++ b/apps/emqx_coap/README.md @@ -151,8 +151,9 @@ To subscribe any topic, issue following command: - if clientid is absent, a "bad_request" will be returned. - {topicname} in URI should be percent-encoded to prevent special characters, such as + and #. - {username} and {password} are optional. -- if {username} and {password} are not correct, an uauthorized error will be returned. +- if {username} or {password} is incorrect, the error code `uauthorized` will be returned. - topic is subscribed with qos1. +- if the subscription failed due to ACL deny, the error code `forbidden` will be returned. CoAP Client Unobserve Operation (unsubscribe topic) --------------------------------------------------- @@ -168,7 +169,7 @@ To cancel observation, issue following command: - if clientid is absent, a "bad_request" will be returned. - {topicname} in URI should be percent-encoded to prevent special characters, such as + and #. - {username} and {password} are optional. -- if {username} and {password} are not correct, an uauthorized error will be returned. +- if {username} or {password} is incorrect, the error code `uauthorized` will be returned. CoAP Client Notification Operation (subscribed Message) ------------------------------------------------------- @@ -179,7 +180,7 @@ Server will issue an observe-notification as a subscribed message. CoAP Client Publish Operation ----------------------------- -Issue a coap put command to do publishment. For example: +Issue a coap put command to publish messages. For example: ``` PUT coap://localhost/mqtt/{topicname}?c={clientid}&u={username}&p={password} @@ -191,10 +192,11 @@ Issue a coap put command to do publishment. For example: - if clientid is absent, a "bad_request" will be returned. - {topicname} in URI should be percent-encoded to prevent special characters, such as + and #. - {username} and {password} are optional. -- if {username} and {password} are not correct, an uauthorized error will be returned. +- if {username} or {password} is incorrect, the error code `uauthorized` will be returned. - payload could be any binary data. - payload data type is "application/octet-stream". - publish message will be sent with qos0. +- if the publishing failed due to ACL deny, the error code `forbidden` will be returned. CoAP Client Keep Alive ---------------------- @@ -209,7 +211,7 @@ Device should issue a get command periodically, serve as a ping to keep mqtt ses - {any_topicname} is optional, and should be percent-encoded to prevent special characters. - {clientid} is mandatory. If clientid is absent, a "bad_request" will be returned. - {username} and {password} are optional. -- if {username} and {password} are not correct, an uauthorized error will be returned. +- if {username} or {password} is incorrect, the error code `uauthorized` will be returned. - coap client should do keepalive work periodically to keep mqtt session online, especially those devices in a NAT network. diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index 537f5137b..f6fb4c716 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -133,8 +133,8 @@ init({ClientId, Username, Password, Channel}) -> handle_call({subscribe, Topic, CoapPid}, _From, State=#state{sub_topics = TopicList}) -> NewTopics = proplists:delete(Topic, TopicList), IsWild = emqx_topic:wildcard(Topic), - chann_subscribe(Topic, State), - {reply, ok, State#state{sub_topics = [{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate}; + {reply, chann_subscribe(Topic, State), State#state{sub_topics = + [{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate}; handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = TopicList}) -> NewTopics = proplists:delete(Topic, TopicList), @@ -142,8 +142,7 @@ handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = Top {reply, ok, State#state{sub_topics = NewTopics}, hibernate}; handle_call({publish, Topic, Payload}, _From, State) -> - _ = chann_publish(Topic, Payload, State), - {reply, ok, State}; + {reply, chann_publish(Topic, Payload, State), State}; handle_call(info, _From, State) -> {reply, info(State), State}; @@ -221,10 +220,12 @@ chann_subscribe(Topic, State = #state{clientid = ClientId}) -> case emqx_access_control:check_acl(clientinfo(State), subscribe, Topic) of allow -> emqx_broker:subscribe(Topic, ClientId, ?SUBOPTS), - emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]); + emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]), + ok; deny -> ?LOG(warning, "subscribe to ~p by clientid ~p failed due to acl check.", - [Topic, ClientId]) + [Topic, ClientId]), + {error, forbidden} end. chann_unsubscribe(Topic, State) -> @@ -239,10 +240,12 @@ chann_publish(Topic, Payload, State = #state{clientid = ClientId}) -> allow -> emqx_broker:publish( emqx_message:set_flag(retain, false, - emqx_message:make(ClientId, ?QOS_0, Topic, Payload))); + emqx_message:make(ClientId, ?QOS_0, Topic, Payload))), + ok; deny -> ?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.", - [Topic, ClientId]) + [Topic, ClientId]), + {error, forbidden} end. diff --git a/apps/emqx_coap/src/emqx_coap_pubsub_resource.erl b/apps/emqx_coap/src/emqx_coap_pubsub_resource.erl index 875a367b4..da066bb36 100644 --- a/apps/emqx_coap/src/emqx_coap_pubsub_resource.erl +++ b/apps/emqx_coap/src/emqx_coap_pubsub_resource.erl @@ -112,14 +112,16 @@ coap_observe(ChId, ?PS_PREFIX, TopicPath, Ack, Content) when TopicPath =/= [] -> Topic = topic(TopicPath), ?LOG(debug, "observe Topic=~p, Ack=~p,Content=~p", [Topic, Ack, Content]), Pid = get(mqtt_client_pid), - emqx_coap_mqtt_adapter:subscribe(Pid, Topic), - Code = case emqx_coap_pubsub_topics:is_topic_timeout(Topic) of - true -> - nocontent; - false-> - content - end, - {ok, {state, ChId, ?PS_PREFIX, [Topic]}, Code, Content}; + case emqx_coap_mqtt_adapter:subscribe(Pid, Topic) of + ok -> + Code = case emqx_coap_pubsub_topics:is_topic_timeout(Topic) of + true -> nocontent; + false-> content + end, + {ok, {state, ChId, ?PS_PREFIX, [Topic]}, Code, Content}; + {error, Code} -> + {error, Code} + end; coap_observe(ChId, Prefix, TopicPath, Ack, _Content) -> ?LOG(error, "unknown observe request ChId=~p, Prefix=~p, TopicPath=~p, Ack=~p", [ChId, Prefix, TopicPath, Ack]), @@ -222,17 +224,19 @@ format_string_to_int(<<"application/json">>) -> handle_received_publish(Topic, MaxAge, Format, Payload) -> case add_topic_info(publish, Topic, MaxAge, format_string_to_int(Format), Payload) of - {Ret ,true} -> + {Ret, true} -> Pid = get(mqtt_client_pid), - emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload), - Content = case Ret of - changed -> - #coap_content{}; - created -> - LocPath = concatenate_location_path([<<"ps">>, Topic, <<>>]), - #coap_content{location_path = [LocPath]} - end, - {ok, Ret, Content}; + case emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload) of + ok -> + {ok, Ret, case Ret of + changed -> #coap_content{}; + created -> + #coap_content{location_path = [ + concatenate_location_path([<<"ps">>, Topic, <<>>])]} + end}; + {error, Code} -> + {error, Code} + end; {_, false} -> ?LOG(debug, "add_topic_info failed, will return bad_request", []), {error, bad_request} diff --git a/apps/emqx_coap/src/emqx_coap_resource.erl b/apps/emqx_coap/src/emqx_coap_resource.erl index e11788a04..e46317347 100644 --- a/apps/emqx_coap/src/emqx_coap_resource.erl +++ b/apps/emqx_coap/src/emqx_coap_resource.erl @@ -56,7 +56,7 @@ coap_get(ChId, ?MQTT_PREFIX, Path, Query, _Content) -> #coap_content{}; {error, auth_failure} -> put(mqtt_client_pid, undefined), - {error, unauthorized}; + {error, forbidden}; {error, bad_request} -> put(mqtt_client_pid, undefined), {error, bad_request}; @@ -74,8 +74,7 @@ coap_post(_ChId, _Prefix, _Topic, _Content) -> coap_put(_ChId, ?MQTT_PREFIX, Topic, #coap_content{payload = Payload}) when Topic =/= [] -> ?LOG(debug, "put message, Topic=~p, Payload=~p~n", [Topic, Payload]), Pid = get(mqtt_client_pid), - emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload), - ok; + emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload); coap_put(_ChId, Prefix, Topic, Content) -> ?LOG(error, "put has error, Prefix=~p, Topic=~p, Content=~p", [Prefix, Topic, Content]), {error, bad_request}. @@ -87,8 +86,10 @@ coap_observe(ChId, ?MQTT_PREFIX, Topic, Ack, Content) when Topic =/= [] -> TrueTopic = topic(Topic), ?LOG(debug, "observe Topic=~p, Ack=~p", [TrueTopic, Ack]), Pid = get(mqtt_client_pid), - emqx_coap_mqtt_adapter:subscribe(Pid, TrueTopic), - {ok, {state, ChId, ?MQTT_PREFIX, [TrueTopic]}, content, Content}; + case emqx_coap_mqtt_adapter:subscribe(Pid, TrueTopic) of + ok -> {ok, {state, ChId, ?MQTT_PREFIX, [TrueTopic]}, content, Content}; + {error, Code} -> {error, Code} + end; coap_observe(ChId, Prefix, Topic, Ack, _Content) -> ?LOG(error, "unknown observe request ChId=~p, Prefix=~p, Topic=~p, Ack=~p", [ChId, Prefix, Topic, Ack]), {error, bad_request}. diff --git a/apps/emqx_coap/test/emqx_coap_SUITE.erl b/apps/emqx_coap/test/emqx_coap_SUITE.erl index 672113e57..0faa4965c 100644 --- a/apps/emqx_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_coap/test/emqx_coap_SUITE.erl @@ -68,6 +68,25 @@ t_publish(_Config) -> ?assert(false) end. +t_publish_acl_deny(_Config) -> + Topic = <<"abc">>, Payload = <<"123">>, + TopicStr = binary_to_list(Topic), + URI = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret", + + %% Sub topic first + emqx:subscribe(Topic), + + ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history]), + ok = meck:expect(emqx_access_control, check_acl, 3, deny), + Reply = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}), + ?assertEqual({error,forbidden}, Reply), + ok = meck:unload(emqx_access_control), + receive + {deliver, Topic, Msg} -> ct:fail({unexpected, {Topic, Msg}}) + after + 500 -> ok + end. + t_observe(_Config) -> Topic = <<"abc">>, TopicStr = binary_to_list(Topic), Payload = <<"123">>, @@ -91,6 +110,15 @@ t_observe(_Config) -> [] = emqx:subscribers(Topic). +t_observe_acl_deny(_Config) -> + Topic = <<"abc">>, TopicStr = binary_to_list(Topic), + Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret", + ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history]), + ok = meck:expect(emqx_access_control, check_acl, 3, deny), + ?assertEqual({error,forbidden}, er_coap_observer:observe(Uri)), + [] = emqx:subscribers(Topic), + ok = meck:unload(emqx_access_control). + t_observe_wildcard(_Config) -> Topic = <<"+/b">>, TopicStr = http_uri:encode(binary_to_list(Topic)), Payload = <<"123">>,