fix: run the 'message.acked' hook when ACK received from CoAP devices

This commit is contained in:
Shawn 2022-10-28 22:23:52 +08:00
parent dd95a26270
commit 3f17119e36
8 changed files with 91 additions and 23 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_coap,
[{description, "EMQ X CoAP Gateway"},
{vsn, "4.3.1"}, % strict semver, bump manually!
{vsn, "4.3.2"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,gen_coap]},

View File

@ -1,9 +1,15 @@
%% -*-: erlang -*-
{VSN,
[{"4.3.0",[
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]},
[{<<"4\\.3\\.[0-1]">>,[
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []},
{load_module, emqx_coap_pubsub_resource, brutal_purge, soft_purge, []},
{load_module, emqx_coap_resource, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}],
[{"4.3.0",[
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]},
[{<<"4\\.3\\.[0-1]">>,[
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []},
{load_module, emqx_coap_pubsub_resource, brutal_purge, soft_purge, []},
{load_module, emqx_coap_resource, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}]
}.

View File

@ -31,6 +31,9 @@
-export([ subscribe/2
, unsubscribe/2
, publish/3
, received_puback/2
, message_payload/1
, message_topic/1
]).
-export([ client_pid/4
@ -95,6 +98,15 @@ unsubscribe(Pid, Topic) ->
publish(Pid, Topic, Payload) ->
gen_server:call(Pid, {publish, Topic, Payload}).
received_puback(Pid, Msg) ->
gen_server:cast(Pid, {received_puback, Msg}).
message_payload(#message{payload = Payload}) ->
Payload.
message_topic(#message{topic = Topic}) ->
Topic.
%% For emqx_management plugin
call(Pid, Msg) ->
call(Pid, Msg, infinity).
@ -172,13 +184,19 @@ handle_call(Request, _From, State) ->
?LOG(error, "adapter unexpected call ~p", [Request]),
{reply, ignored, State, hibernate}.
handle_cast({received_puback, Msg}, State) ->
%% NOTE: the counter named 'messages.acked', but the hook named 'message.acked'!
ok = emqx_metrics:inc('messages.acked'),
_ = emqx_hooks:run('message.acked', [conninfo(State), Msg]),
{noreply, State, hibernate};
handle_cast(Msg, State) ->
?LOG(error, "broker_api unexpected cast ~p", [Msg]),
{noreply, State, hibernate}.
handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}},
handle_info({deliver, _Topic, #message{} = Msg},
State = #state{sub_topics = Subscribers}) ->
deliver([{Topic, Payload}], Subscribers),
deliver([Msg], Subscribers),
{noreply, State, hibernate};
handle_info(check_alive, State = #state{sub_topics = []}) ->
@ -271,27 +289,25 @@ packet_to_message(Topic, Payload,
%% Deliver
deliver([], _) -> ok;
deliver([Pub | More], Subscribers) ->
ok = do_deliver(Pub, Subscribers),
deliver([Msg | More], Subscribers) ->
ok = do_deliver(Msg, Subscribers),
deliver(More, Subscribers).
do_deliver({Topic, Payload}, Subscribers) ->
do_deliver(Msg, Subscribers) ->
%% handle PUBLISH packet from broker
?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]),
deliver_to_coap(Topic, Payload, Subscribers),
?LOG(debug, "deliver message from broker, msg: ~p", [Msg]),
deliver_to_coap(Msg, Subscribers),
ok.
deliver_to_coap(_TopicName, _Payload, []) ->
deliver_to_coap(_Msg, []) ->
ok;
deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}} | T]) ->
deliver_to_coap(#message{topic = TopicName} = Msg, [{TopicFilter, {IsWild, CoapPid}} | T]) ->
Matched = case IsWild of
true -> emqx_topic:match(TopicName, TopicFilter);
false -> TopicName =:= TopicFilter
end,
%?LOG(debug, "deliver_to_coap Matched=~p, CoapPid=~p, TopicName=~p, Payload=~p, T=~p",
% [Matched, CoapPid, TopicName, Payload, T]),
Matched andalso (CoapPid ! {dispatch, TopicName, Payload}),
deliver_to_coap(TopicName, Payload, T).
Matched andalso (CoapPid ! {dispatch, Msg}),
deliver_to_coap(Msg, T).
%%--------------------------------------------------------------------
%% Helper funcs
@ -328,12 +344,13 @@ chann_info(State) ->
will_msg => undefined
}.
conninfo(#state{peername = Peername,
conninfo(#state{peername = {PeerHost, _} = Peername,
clientid = ClientId,
connected_at = ConnectedAt}) ->
#{socktype => udp,
sockname => {{127, 0, 0, 1}, 5683},
peername => Peername,
peerhost => PeerHost,
peercert => nossl, %% TODO: dtls
conn_mod => ?MODULE,
proto_name => <<"CoAP">>,

View File

@ -138,16 +138,32 @@ coap_unobserve({state, ChId, Prefix, TopicPath}) ->
ok.
handle_info({dispatch, Topic, Payload}, State) ->
%% This clause should never be matched any more. We keep it here to handle
%% the old format messages during the release upgrade.
%% In this case the second function clause of `coap_ack/2` will be called,
%% and the ACKs is discarded.
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
{ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload),
?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info({dispatch, Msg}, State) ->
Payload = emqx_coap_mqtt_adapter:message_payload(Msg),
Topic = emqx_coap_mqtt_adapter:message_topic(Msg),
{ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload),
?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
{notify, {pub, Msg}, #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info(Message, State) ->
?LOG(error, "Unknown Message ~p", [Message]),
{noreply, State}.
coap_ack(_Ref, State) -> {ok, State}.
coap_ack({pub, Msg}, State) ->
?LOG(debug, "received coap ack for publish msg: ~p", [Msg]),
Pid = get(mqtt_client_pid),
emqx_coap_mqtt_adapter:received_puback(Pid, Msg),
{ok, State};
coap_ack(_Ref, State) ->
?LOG(debug, "received coap ack: ~p", [_Ref]),
{ok, State}.
%%--------------------------------------------------------------------
%% Internal Functions

View File

@ -104,12 +104,26 @@ coap_unobserve({state, ChId, Prefix, Topic}) ->
ok.
handle_info({dispatch, Topic, Payload}, State) ->
%% This clause should never be matched any more. We keep it here to handle
%% the old format messages during the release upgrade.
%% In this case the second function clause of `coap_ack/2` will be called,
%% and the ACKs is discarded.
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info({dispatch, Msg}, State) ->
Payload = emqx_coap_mqtt_adapter:message_payload(Msg),
{notify, {pub, Msg}, #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info(Message, State) ->
emqx_coap_mqtt_adapter:handle_info(Message, State).
coap_ack(_Ref, State) -> {ok, State}.
coap_ack({pub, Msg}, State) ->
?LOG(debug, "received coap ack for publish msg: ~p", [Msg]),
Pid = get(mqtt_client_pid),
emqx_coap_mqtt_adapter:received_puback(Pid, Msg),
{ok, State};
coap_ack(_Ref, State) ->
?LOG(debug, "received coap ack: ~p", [_Ref]),
{ok, State}.
get_auth(Query) ->
get_auth(Query, #coap_mqtt_auth{}).

View File

@ -91,7 +91,7 @@ t_observe(_Config) ->
Topic = <<"abc">>, TopicStr = binary_to_list(Topic),
Payload = <<"123">>,
Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
{ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri),
{ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri),
?LOGT("observer Pid=~p, N=~p, Code=~p, Content=~p", [Pid, N, Code, Content]),
[SubPid] = emqx:subscribers(Topic),
@ -195,12 +195,16 @@ t_one_clientid_sub_2_topics(_Config) ->
[SubPid] = emqx:subscribers(Topic2),
?assert(is_pid(SubPid)),
CntrAcked1 = emqx_metrics:val('messages.acked'),
emqx:publish(emqx_message:make(Topic1, Payload1)),
Notif1 = receive_notification(),
?LOGT("observer 1 get Notif=~p", [Notif1]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv1}} = Notif1,
?assertEqual(Payload1, PayloadRecv1),
timer:sleep(100),
CntrAcked2 = emqx_metrics:val('messages.acked'),
?assertEqual(CntrAcked2, CntrAcked1 + 1),
emqx:publish(emqx_message:make(Topic2, Payload2)),
@ -208,6 +212,9 @@ t_one_clientid_sub_2_topics(_Config) ->
?LOGT("observer 2 get Notif=~p", [Notif2]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv2}} = Notif2,
?assertEqual(Payload2, PayloadRecv2),
timer:sleep(100),
CntrAcked3 = emqx_metrics:val('messages.acked'),
?assertEqual(CntrAcked3, CntrAcked2 + 1),
er_coap_observer:stop(Pid1),
er_coap_observer:stop(Pid2).

View File

@ -2,6 +2,11 @@
## Enhancements
- We now trigger the `'message.acked'` hook after the CoAP gateway sends a message to the device and receives the ACK from the device [#9264](https://github.com/emqx/emqx/pull/9264).
With this change, the CoAP gateway can be combined with the offline message caching function (in the
emqx enterprise), so that CoAP devices are able to read the missed messages from the database when
it is online again.
- Support to use placeholders like `${var}` in the HTTP `Headers` of rule-engine's Webhook actions [#9239](https://github.com/emqx/emqx/pull/9239).
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).

View File

@ -2,6 +2,9 @@
## 增强
- 当 CoAP 网关给设备投递消息并收到设备发来的确认之后,回调 `'message.acked'` 钩子 [#9264](https://github.com/emqx/emqx/pull/9264)。
有了这个改动CoAP 网关可以配合 EMQX (企业版)的离线消息缓存功能,让 CoAP 设备重新上线之后,从数据库读取其离线状态下错过的消息。
- 支持在规则引擎的 Webhook 动作的 HTTP Headers 里使用 `${var}` 格式的占位符 [#9239](https://github.com/emqx/emqx/pull/9239)。
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。