Line data Source code
1 : /*
2 : *
3 : * Copyright (c) 2021 Project CHIP Authors
4 : * All rights reserved.
5 : *
6 : * Licensed under the Apache License, Version 2.0 (the "License");
7 : * you may not use this file except in compliance with the License.
8 : * You may obtain a copy of the License at
9 : *
10 : * http://www.apache.org/licenses/LICENSE-2.0
11 : *
12 : * Unless required by applicable law or agreed to in writing, software
13 : * distributed under the License is distributed on an "AS IS" BASIS,
14 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 : * See the License for the specific language governing permissions and
16 : * limitations under the License.
17 : */
18 :
19 : /**
20 : * @file
21 : * This file defines the initiator side of a CHIP Read Interaction.
22 : *
23 : */
24 :
25 : #include <app/AppConfig.h>
26 : #include <app/InteractionModelEngine.h>
27 : #include <app/InteractionModelHelper.h>
28 : #include <app/ReadClient.h>
29 : #include <app/StatusResponse.h>
30 : #include <assert.h>
31 : #include <lib/core/TLVTypes.h>
32 : #include <lib/support/FibonacciUtils.h>
33 : #include <messaging/ReliableMessageMgr.h>
34 : #include <messaging/ReliableMessageProtocolConfig.h>
35 : #include <platform/LockTracker.h>
36 : #include <tracing/metric_event.h>
37 :
38 : #include <app-common/zap-generated/cluster-objects.h>
39 : #include <app-common/zap-generated/ids/Attributes.h>
40 : #include <app-common/zap-generated/ids/Clusters.h>
41 :
42 : namespace chip {
43 : namespace app {
44 :
45 : using Status = Protocols::InteractionModel::Status;
46 :
47 1120 : ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeManager * apExchangeMgr, Callback & apCallback,
48 1120 : InteractionType aInteractionType) :
49 1120 : mExchange(*this),
50 1120 : mpCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
51 2240 : mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
52 : {
53 1120 : assertChipStackLockedByCurrentThread();
54 :
55 1120 : mpExchangeMgr = apExchangeMgr;
56 1120 : mInteractionType = aInteractionType;
57 :
58 1120 : mpImEngine = apImEngine;
59 :
60 1120 : if (aInteractionType == InteractionType::Subscribe)
61 : {
62 316 : mpImEngine->AddReadClient(this);
63 : }
64 1120 : }
65 :
66 282 : void ReadClient::ClearActiveSubscriptionState()
67 : {
68 282 : mIsReporting = false;
69 282 : mWaitingForFirstPrimingReport = true;
70 282 : mPendingMoreChunks = false;
71 282 : mMinIntervalFloorSeconds = 0;
72 282 : mMaxInterval = 0;
73 282 : mSubscriptionId = 0;
74 282 : mIsResubscriptionScheduled = false;
75 :
76 282 : MoveToState(ClientState::Idle);
77 282 : }
78 :
79 584 : void ReadClient::StopResubscription()
80 : {
81 584 : CancelLivenessCheckTimer();
82 584 : CancelResubscribeTimer();
83 :
84 : // Only deallocate the paths if they are not already deallocated.
85 584 : if (mReadPrepareParams.mpAttributePathParamsList != nullptr || mReadPrepareParams.mpEventPathParamsList != nullptr ||
86 358 : mReadPrepareParams.mpDataVersionFilterList != nullptr)
87 : {
88 226 : mpCallback.OnDeallocatePaths(std::move(mReadPrepareParams));
89 : // Make sure we will never try to free those pointers again.
90 226 : mReadPrepareParams.mpAttributePathParamsList = nullptr;
91 226 : mReadPrepareParams.mAttributePathParamsListSize = 0;
92 226 : mReadPrepareParams.mpEventPathParamsList = nullptr;
93 226 : mReadPrepareParams.mEventPathParamsListSize = 0;
94 226 : mReadPrepareParams.mpDataVersionFilterList = nullptr;
95 226 : mReadPrepareParams.mDataVersionFilterListSize = 0;
96 : }
97 584 : }
98 :
99 1208 : ReadClient::~ReadClient()
100 : {
101 1120 : assertChipStackLockedByCurrentThread();
102 :
103 1120 : if (IsSubscriptionType())
104 : {
105 316 : StopResubscription();
106 :
107 : // Only remove ourselves from the engine's tracker list if we still continue to have a valid pointer to it.
108 : // This won't be the case if the engine shut down before this destructor was called (in which case, mpImEngine
109 : // will point to null)
110 : //
111 316 : if (mpImEngine)
112 : {
113 151 : mpImEngine->RemoveReadClient(this);
114 : }
115 : }
116 1208 : }
117 :
118 13 : uint32_t ReadClient::ComputeTimeTillNextSubscription()
119 : {
120 13 : uint32_t maxWaitTimeInMsec = 0;
121 13 : uint32_t waitTimeInMsec = 0;
122 13 : uint32_t minWaitTimeInMsec = 0;
123 :
124 13 : if (mNumRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX)
125 : {
126 13 : maxWaitTimeInMsec = GetFibonacciForIndex(mNumRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS;
127 : }
128 : else
129 : {
130 0 : maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS;
131 : }
132 :
133 13 : if (maxWaitTimeInMsec != 0)
134 : {
135 3 : minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100;
136 3 : waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec));
137 : }
138 :
139 13 : if (mMinimalResubscribeDelay.count() > waitTimeInMsec)
140 : {
141 0 : waitTimeInMsec = mMinimalResubscribeDelay.count();
142 : }
143 :
144 13 : return waitTimeInMsec;
145 : }
146 :
147 13 : CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional<SessionHandle> aNewSessionHandle,
148 : bool aReestablishCASE)
149 : {
150 13 : VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
151 :
152 : //
153 : // If we're establishing CASE, make sure we are not provided a new SessionHandle as well.
154 : //
155 13 : VerifyOrReturnError(!aReestablishCASE || !aNewSessionHandle.HasValue(), CHIP_ERROR_INVALID_ARGUMENT);
156 :
157 13 : if (aNewSessionHandle.HasValue())
158 : {
159 0 : mReadPrepareParams.mSessionHolder.Grab(aNewSessionHandle.Value());
160 : }
161 :
162 13 : mForceCaseOnNextResub = aReestablishCASE;
163 13 : if (mForceCaseOnNextResub && mReadPrepareParams.mSessionHolder)
164 : {
165 : // Mark our existing session defunct, so that we will try to
166 : // re-establish it when the timer fires (unless something re-establishes
167 : // before then).
168 0 : mReadPrepareParams.mSessionHolder->AsSecureSession()->MarkAsDefunct();
169 : }
170 :
171 13 : ReturnErrorOnFailure(
172 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
173 : System::Clock::Milliseconds32(aTimeTillNextResubscriptionMs), OnResubscribeTimerCallback, this));
174 13 : mIsResubscriptionScheduled = true;
175 :
176 13 : return CHIP_NO_ERROR;
177 : }
178 :
179 1022 : void ReadClient::Close(CHIP_ERROR aError, bool allowResubscription)
180 : {
181 1022 : if (IsReadType())
182 : {
183 740 : if (aError != CHIP_NO_ERROR)
184 : {
185 49 : mpCallback.OnError(aError);
186 : }
187 : }
188 : else
189 : {
190 282 : if (IsAwaitingInitialReport() || IsAwaitingSubscribeResponse())
191 : {
192 : MATTER_LOG_METRIC_END(Tracing::kMetricDeviceSubscriptionSetup, aError);
193 : }
194 :
195 282 : ClearActiveSubscriptionState();
196 282 : if (aError != CHIP_NO_ERROR)
197 : {
198 : //
199 : // We infer that re-subscription was requested by virtue of having a non-zero list of event OR attribute paths present
200 : // in mReadPrepareParams. This would only be the case if an application called SendAutoResubscribeRequest which
201 : // populates mReadPrepareParams with the values provided by the application.
202 : //
203 115 : if (allowResubscription &&
204 48 : (mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0))
205 : {
206 15 : CHIP_ERROR originalReason = aError;
207 :
208 15 : aError = mpCallback.OnResubscriptionNeeded(this, aError);
209 15 : if (aError == CHIP_NO_ERROR)
210 : {
211 15 : return;
212 : }
213 2 : if (aError == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT)
214 : {
215 2 : VerifyOrDie(originalReason == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT);
216 2 : ChipLogProgress(DataManagement, "ICD device is inactive mark subscription as InactiveICDSubscription");
217 2 : MoveToState(ClientState::InactiveICDSubscription);
218 2 : return;
219 : }
220 : }
221 :
222 : //
223 : // Either something bad happened when requesting resubscription or the application has decided to not
224 : // continue by returning an error. Let's convey the error back up to the application
225 : // and shut everything down.
226 : //
227 100 : mpCallback.OnError(aError);
228 : }
229 :
230 267 : StopResubscription();
231 : }
232 :
233 1007 : mExchange.Release();
234 :
235 1007 : mpCallback.OnDone(this);
236 : }
237 :
238 1961 : const char * ReadClient::GetStateStr() const
239 : {
240 : #if CHIP_DETAIL_LOGGING
241 1961 : switch (mState)
242 : {
243 282 : case ClientState::Idle:
244 282 : return "Idle";
245 1111 : case ClientState::AwaitingInitialReport:
246 1111 : return "AwaitingInitialReport";
247 286 : case ClientState::AwaitingSubscribeResponse:
248 286 : return "AwaitingSubscribeResponse";
249 280 : case ClientState::SubscriptionActive:
250 280 : return "SubscriptionActive";
251 2 : case ClientState::InactiveICDSubscription:
252 2 : return "InactiveICDSubscription";
253 : }
254 : #endif // CHIP_DETAIL_LOGGING
255 0 : return "N/A";
256 : }
257 :
258 1961 : void ReadClient::MoveToState(const ClientState aTargetState)
259 : {
260 1961 : mState = aTargetState;
261 1961 : ChipLogDetail(DataManagement, "%s ReadClient[%p]: Moving to [%10.10s]", __func__, this, GetStateStr());
262 1961 : }
263 :
264 868 : CHIP_ERROR ReadClient::SendRequest(ReadPrepareParams & aReadPrepareParams)
265 : {
266 868 : if (mInteractionType == InteractionType::Read)
267 : {
268 792 : return SendReadRequest(aReadPrepareParams);
269 : }
270 :
271 76 : if (mInteractionType == InteractionType::Subscribe)
272 : {
273 76 : return SendSubscribeRequest(aReadPrepareParams);
274 : }
275 :
276 0 : return CHIP_ERROR_INVALID_ARGUMENT;
277 : }
278 :
279 792 : CHIP_ERROR ReadClient::SendReadRequest(ReadPrepareParams & aReadPrepareParams)
280 : {
281 792 : CHIP_ERROR err = CHIP_NO_ERROR;
282 :
283 792 : ChipLogDetail(DataManagement, "%s ReadClient[%p]: Sending Read Request", __func__, this);
284 :
285 792 : VerifyOrReturnError(ClientState::Idle == mState, err = CHIP_ERROR_INCORRECT_STATE);
286 :
287 : Span<AttributePathParams> attributePaths(aReadPrepareParams.mpAttributePathParamsList,
288 792 : aReadPrepareParams.mAttributePathParamsListSize);
289 792 : Span<EventPathParams> eventPaths(aReadPrepareParams.mpEventPathParamsList, aReadPrepareParams.mEventPathParamsListSize);
290 : Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList,
291 792 : aReadPrepareParams.mDataVersionFilterListSize);
292 :
293 792 : System::PacketBufferHandle msgBuf;
294 792 : ReadRequestMessage::Builder request;
295 792 : System::PacketBufferTLVWriter writer;
296 :
297 792 : InitWriterWithSpaceReserved(writer, kReservedSizeForTLVEncodingOverhead);
298 792 : ReturnErrorOnFailure(request.Init(&writer));
299 :
300 792 : if (!attributePaths.empty())
301 : {
302 673 : AttributePathIBs::Builder & attributePathListBuilder = request.CreateAttributeRequests();
303 673 : ReturnErrorOnFailure(err = request.GetError());
304 673 : ReturnErrorOnFailure(GenerateAttributePaths(attributePathListBuilder, attributePaths));
305 : }
306 :
307 792 : if (!eventPaths.empty())
308 : {
309 317 : EventPathIBs::Builder & eventPathListBuilder = request.CreateEventRequests();
310 317 : ReturnErrorOnFailure(err = request.GetError());
311 :
312 317 : ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths));
313 :
314 317 : Optional<EventNumber> eventMin;
315 317 : ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin));
316 317 : if (eventMin.HasValue())
317 : {
318 314 : EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
319 314 : ReturnErrorOnFailure(err = request.GetError());
320 314 : ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value()));
321 : }
322 : }
323 :
324 792 : ReturnErrorOnFailure(request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError());
325 :
326 792 : bool encodedDataVersionList = false;
327 792 : TLV::TLVWriter backup;
328 792 : request.Checkpoint(backup);
329 792 : DataVersionFilterIBs::Builder & dataVersionFilterListBuilder = request.CreateDataVersionFilters();
330 792 : ReturnErrorOnFailure(request.GetError());
331 792 : if (!attributePaths.empty())
332 : {
333 673 : ReturnErrorOnFailure(GenerateDataVersionFilterList(dataVersionFilterListBuilder, attributePaths, dataVersionFilters,
334 : encodedDataVersionList));
335 : }
336 792 : ReturnErrorOnFailure(dataVersionFilterListBuilder.GetWriter()->UnreserveBuffer(kReservedSizeForTLVEncodingOverhead));
337 792 : if (encodedDataVersionList)
338 : {
339 80 : ReturnErrorOnFailure(dataVersionFilterListBuilder.EndOfDataVersionFilterIBs());
340 : }
341 : else
342 : {
343 712 : request.Rollback(backup);
344 : }
345 :
346 792 : ReturnErrorOnFailure(request.EndOfReadRequestMessage());
347 792 : ReturnErrorOnFailure(writer.Finalize(&msgBuf));
348 :
349 792 : VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION);
350 :
351 792 : auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
352 792 : VerifyOrReturnError(exchange != nullptr, err = CHIP_ERROR_NO_MEMORY);
353 :
354 792 : mExchange.Grab(exchange);
355 :
356 792 : if (aReadPrepareParams.mTimeout == System::Clock::kZero)
357 : {
358 792 : mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
359 : }
360 : else
361 : {
362 0 : mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout);
363 : }
364 :
365 792 : ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::ReadRequest, std::move(msgBuf),
366 : Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));
367 :
368 792 : mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer();
369 792 : MoveToState(ClientState::AwaitingInitialReport);
370 :
371 792 : return CHIP_NO_ERROR;
372 792 : }
373 :
374 343 : CHIP_ERROR ReadClient::GenerateEventPaths(EventPathIBs::Builder & aEventPathsBuilder, const Span<EventPathParams> & aEventPaths)
375 : {
376 854 : for (auto & event : aEventPaths)
377 : {
378 511 : VerifyOrReturnError(event.IsValidEventPath(), CHIP_ERROR_IM_MALFORMED_EVENT_PATH_IB);
379 511 : EventPathIB::Builder & path = aEventPathsBuilder.CreatePath();
380 511 : ReturnErrorOnFailure(aEventPathsBuilder.GetError());
381 511 : ReturnErrorOnFailure(path.Encode(event));
382 : }
383 :
384 343 : return aEventPathsBuilder.EndOfEventPaths();
385 : }
386 :
387 981 : CHIP_ERROR ReadClient::GenerateAttributePaths(AttributePathIBs::Builder & aAttributePathIBsBuilder,
388 : const Span<AttributePathParams> & aAttributePaths)
389 : {
390 2976 : for (auto & attribute : aAttributePaths)
391 : {
392 1997 : VerifyOrReturnError(attribute.IsValidAttributePath(), CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH_IB);
393 1995 : AttributePathIB::Builder & path = aAttributePathIBsBuilder.CreatePath();
394 1995 : ReturnErrorOnFailure(aAttributePathIBsBuilder.GetError());
395 1995 : ReturnErrorOnFailure(path.Encode(attribute));
396 : }
397 :
398 979 : return aAttributePathIBsBuilder.EndOfAttributePathIBs();
399 : }
400 :
401 972 : CHIP_ERROR ReadClient::BuildDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
402 : const Span<AttributePathParams> & aAttributePaths,
403 : const Span<DataVersionFilter> & aDataVersionFilters,
404 : bool & aEncodedDataVersionList)
405 : {
406 : #if CHIP_PROGRESS_LOGGING
407 972 : size_t encodedFilterCount = 0;
408 972 : size_t irrelevantFilterCount = 0;
409 972 : size_t skippedFilterCount = 0;
410 : #endif
411 3465 : for (auto & filter : aDataVersionFilters)
412 : {
413 2494 : VerifyOrReturnError(filter.IsValidDataVersionFilter(), CHIP_ERROR_INVALID_ARGUMENT);
414 :
415 : // If data version filter is for some cluster none of whose attributes are included in our paths, discard this filter.
416 2494 : bool intersected = false;
417 2506 : for (auto & path : aAttributePaths)
418 : {
419 2500 : if (path.IncludesAttributesInCluster(filter))
420 : {
421 2488 : intersected = true;
422 2488 : break;
423 : }
424 : }
425 :
426 2494 : if (!intersected)
427 : {
428 : #if CHIP_PROGRESS_LOGGING
429 6 : ++irrelevantFilterCount;
430 : #endif
431 6 : continue;
432 : }
433 :
434 2488 : TLV::TLVWriter backup;
435 2488 : aDataVersionFilterIBsBuilder.Checkpoint(backup);
436 2488 : CHIP_ERROR err = aDataVersionFilterIBsBuilder.EncodeDataVersionFilterIB(filter);
437 2488 : if (err == CHIP_NO_ERROR)
438 : {
439 : #if CHIP_PROGRESS_LOGGING
440 2487 : ++encodedFilterCount;
441 : #endif
442 2487 : aEncodedDataVersionList = true;
443 : }
444 1 : else if (err == CHIP_ERROR_NO_MEMORY || err == CHIP_ERROR_BUFFER_TOO_SMALL)
445 : {
446 : // Packet is full, ignore the rest of the list
447 1 : aDataVersionFilterIBsBuilder.Rollback(backup);
448 : #if CHIP_PROGRESS_LOGGING
449 1 : ssize_t nonSkippedFilterCount = &filter - aDataVersionFilters.data();
450 1 : skippedFilterCount = aDataVersionFilters.size() - static_cast<size_t>(nonSkippedFilterCount);
451 : #endif // CHIP_PROGRESS_LOGGING
452 1 : break;
453 : }
454 : else
455 : {
456 0 : return err;
457 : }
458 : }
459 :
460 972 : ChipLogProgress(DataManagement,
461 : "%lu data version filters provided, %lu not relevant, %lu encoded, %lu skipped due to lack of space",
462 : static_cast<unsigned long>(aDataVersionFilters.size()), static_cast<unsigned long>(irrelevantFilterCount),
463 : static_cast<unsigned long>(encodedFilterCount), static_cast<unsigned long>(skippedFilterCount));
464 972 : return CHIP_NO_ERROR;
465 : }
466 :
467 977 : CHIP_ERROR ReadClient::GenerateDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
468 : const Span<AttributePathParams> & aAttributePaths,
469 : const Span<DataVersionFilter> & aDataVersionFilters,
470 : bool & aEncodedDataVersionList)
471 : {
472 : // Give the callback a chance first, otherwise use the list we have, if any.
473 977 : ReturnErrorOnFailure(
474 : mpCallback.OnUpdateDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aEncodedDataVersionList));
475 :
476 977 : if (!aEncodedDataVersionList)
477 : {
478 972 : ReturnErrorOnFailure(BuildDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aDataVersionFilters,
479 : aEncodedDataVersionList));
480 : }
481 :
482 977 : return CHIP_NO_ERROR;
483 : }
484 :
485 3 : void ReadClient::OnActiveModeNotification()
486 : {
487 3 : VerifyOrDie(mpImEngine->InActiveReadClientList(this));
488 :
489 : // Note: this API only works when issuing subscription via SendAutoResubscribeRequest. When SendAutoResubscribeRequest is
490 : // called, either mEventPathParamsListSize or mAttributePathParamsListSize is not 0.
491 3 : VerifyOrReturn(mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0);
492 :
493 : // When we reach here, the subscription definitely exceeded the liveness timeout. Just continue the unfinished resubscription
494 : // logic in `OnLivenessTimeoutCallback`.
495 3 : if (IsInactiveICDSubscription())
496 : {
497 1 : TriggerResubscriptionForLivenessTimeout(CHIP_ERROR_TIMEOUT);
498 1 : return;
499 : }
500 :
501 : // If this API has been called, that means the subscription for this ReadClient is gone
502 : // on the server side (because otherwise the server would not have checked in with us).
503 : // Even if we think we have a live subscription, we are wrong, and should just forcibly time it
504 : // out and schedule a new one.
505 2 : if (!mIsResubscriptionScheduled)
506 : {
507 : // Closing will ultimately trigger ScheduleResubscription with the aReestablishCASE argument set to true, effectively
508 : // rendering the session defunct.
509 1 : Close(CHIP_ERROR_TIMEOUT);
510 1 : return;
511 : }
512 :
513 : // If we have already detected subscription loss and are waiting to try to re-subscribe,
514 : // now is a really good time to do it, since the server is listening.
515 1 : TriggerResubscribeIfScheduled("check-in message");
516 : }
517 :
518 3 : void ReadClient::OnPeerTypeChange(PeerType aType)
519 : {
520 3 : VerifyOrDie(mpImEngine->InActiveReadClientList(this));
521 :
522 3 : mIsPeerLIT = (aType == PeerType::kLITICD);
523 :
524 3 : ChipLogProgress(DataManagement, "Peer is now %s LIT ICD.", mIsPeerLIT ? "a" : "not a");
525 :
526 : // If the peer is no longer LIT and we were waiting for a check-in to try to resubscribe,
527 : // just try to resubscribe now, because a SIT is not going to send a check-in.
528 3 : if (!mIsPeerLIT && IsInactiveICDSubscription())
529 : {
530 0 : TriggerResubscriptionForLivenessTimeout(CHIP_ERROR_TIMEOUT);
531 : }
532 3 : }
533 :
534 2184 : CHIP_ERROR ReadClient::OnMessageReceived(Messaging::ExchangeContext * apExchangeContext, const PayloadHeader & aPayloadHeader,
535 : System::PacketBufferHandle && aPayload)
536 : {
537 2184 : CHIP_ERROR err = CHIP_NO_ERROR;
538 2184 : Status status = Status::InvalidAction;
539 2184 : VerifyOrExit(!IsIdle() && !IsInactiveICDSubscription(), err = CHIP_ERROR_INCORRECT_STATE);
540 :
541 2184 : if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::ReportData))
542 : {
543 1832 : err = ProcessReportData(std::move(aPayload), ReportType::kContinuingTransaction);
544 : }
545 352 : else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::SubscribeResponse))
546 : {
547 282 : ChipLogProgress(DataManagement, "SubscribeResponse is received");
548 282 : VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE);
549 282 : err = ProcessSubscribeResponse(std::move(aPayload));
550 : MATTER_LOG_METRIC_END(Tracing::kMetricDeviceSubscriptionSetup, err);
551 : }
552 70 : else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::StatusResponse))
553 : {
554 138 : VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE);
555 70 : CHIP_ERROR statusError = CHIP_NO_ERROR;
556 70 : SuccessOrExit(err = StatusResponse::ProcessStatusResponse(std::move(aPayload), statusError));
557 70 : SuccessOrExit(err = statusError);
558 2 : err = CHIP_ERROR_INVALID_MESSAGE_TYPE;
559 : }
560 : else
561 : {
562 0 : err = CHIP_ERROR_INVALID_MESSAGE_TYPE;
563 : }
564 :
565 2184 : exit:
566 2184 : if (err != CHIP_NO_ERROR)
567 : {
568 76 : if (err == CHIP_ERROR_INVALID_SUBSCRIPTION)
569 : {
570 4 : status = Status::InvalidSubscription;
571 : }
572 76 : StatusResponse::Send(status, apExchangeContext, false /*aExpectResponse*/);
573 : }
574 :
575 2184 : if ((!IsSubscriptionType() && !mPendingMoreChunks) || err != CHIP_NO_ERROR)
576 : {
577 767 : Close(err);
578 : }
579 :
580 2184 : return err;
581 : }
582 :
583 82 : void ReadClient::OnUnsolicitedReportData(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload)
584 : {
585 82 : Status status = Status::Success;
586 82 : mExchange.Grab(apExchangeContext);
587 :
588 : //
589 : // Let's update the session we're tracking in our SessionHolder to that associated with the message that was just received.
590 : // This CAN be different from the one we were tracking before, since the server is permitted to send exchanges on any valid
591 : // session to us, of which there could be multiple.
592 : //
593 : // Since receipt of a message is proof of a working session on the peer, it's always best to update to that if possible
594 : // to maximize our chances of success later.
595 : //
596 82 : mReadPrepareParams.mSessionHolder.Grab(mExchange->GetSessionHandle());
597 :
598 82 : CHIP_ERROR err = ProcessReportData(std::move(aPayload), ReportType::kUnsolicited);
599 82 : if (err != CHIP_NO_ERROR)
600 : {
601 0 : if (err == CHIP_ERROR_INVALID_SUBSCRIPTION)
602 : {
603 0 : status = Status::InvalidSubscription;
604 : }
605 : else
606 : {
607 0 : status = Status::InvalidAction;
608 : }
609 :
610 0 : StatusResponse::Send(status, mExchange.Get(), false /*aExpectResponse*/);
611 0 : Close(err);
612 : }
613 82 : }
614 :
615 1922 : CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload, ReportType aReportType)
616 : {
617 1922 : CHIP_ERROR err = CHIP_NO_ERROR;
618 1922 : ReportDataMessage::Parser report;
619 1922 : bool suppressResponse = true;
620 1922 : SubscriptionId subscriptionId = 0;
621 1922 : EventReportIBs::Parser eventReportIBs;
622 1922 : AttributeReportIBs::Parser attributeReportIBs;
623 1922 : System::PacketBufferTLVReader reader;
624 1922 : reader.Init(std::move(aPayload));
625 1922 : err = report.Init(reader);
626 1922 : SuccessOrExit(err);
627 :
628 : #if CHIP_CONFIG_IM_PRETTY_PRINT
629 1920 : if (aReportType != ReportType::kUnsolicited)
630 : {
631 1838 : report.PrettyPrint();
632 : }
633 : #endif
634 :
635 1920 : err = report.GetSuppressResponse(&suppressResponse);
636 1920 : if (CHIP_END_OF_TLV == err)
637 : {
638 1221 : suppressResponse = false;
639 1221 : err = CHIP_NO_ERROR;
640 : }
641 1920 : SuccessOrExit(err);
642 :
643 1920 : err = report.GetSubscriptionId(&subscriptionId);
644 1920 : if (CHIP_NO_ERROR == err)
645 : {
646 418 : VerifyOrExit(IsSubscriptionType(), err = CHIP_ERROR_INVALID_ARGUMENT);
647 416 : if (mWaitingForFirstPrimingReport)
648 : {
649 284 : mSubscriptionId = subscriptionId;
650 : }
651 132 : else if (!IsMatchingSubscriptionId(subscriptionId))
652 : {
653 2 : err = CHIP_ERROR_INVALID_SUBSCRIPTION;
654 : }
655 : }
656 1502 : else if (CHIP_END_OF_TLV == err)
657 : {
658 1502 : if (IsSubscriptionType())
659 : {
660 0 : err = CHIP_ERROR_INVALID_ARGUMENT;
661 : }
662 : else
663 : {
664 1502 : err = CHIP_NO_ERROR;
665 : }
666 : }
667 1918 : SuccessOrExit(err);
668 :
669 1916 : err = report.GetMoreChunkedMessages(&mPendingMoreChunks);
670 1916 : if (CHIP_END_OF_TLV == err)
671 : {
672 1051 : mPendingMoreChunks = false;
673 1051 : err = CHIP_NO_ERROR;
674 : }
675 1916 : SuccessOrExit(err);
676 :
677 1916 : err = report.GetEventReports(&eventReportIBs);
678 1916 : if (err == CHIP_END_OF_TLV)
679 : {
680 1276 : err = CHIP_NO_ERROR;
681 : }
682 640 : else if (err == CHIP_NO_ERROR)
683 : {
684 640 : chip::TLV::TLVReader EventReportsReader;
685 640 : eventReportIBs.GetReader(&EventReportsReader);
686 640 : err = ProcessEventReportIBs(EventReportsReader);
687 : }
688 1916 : SuccessOrExit(err);
689 :
690 1916 : err = report.GetAttributeReportIBs(&attributeReportIBs);
691 1916 : if (err == CHIP_END_OF_TLV)
692 : {
693 631 : err = CHIP_NO_ERROR;
694 : }
695 1285 : else if (err == CHIP_NO_ERROR)
696 : {
697 1285 : TLV::TLVReader attributeReportIBsReader;
698 1285 : attributeReportIBs.GetReader(&attributeReportIBsReader);
699 1285 : err = ProcessAttributeReportIBs(attributeReportIBsReader);
700 : }
701 1916 : SuccessOrExit(err);
702 :
703 1914 : if (mIsReporting && !mPendingMoreChunks)
704 : {
705 1020 : mpCallback.OnReportEnd();
706 1020 : mIsReporting = false;
707 : }
708 :
709 1914 : SuccessOrExit(err = report.ExitContainer());
710 :
711 1922 : exit:
712 1922 : if (IsSubscriptionType())
713 : {
714 418 : if (IsAwaitingInitialReport())
715 : {
716 286 : MoveToState(ClientState::AwaitingSubscribeResponse);
717 : }
718 132 : else if (IsSubscriptionActive() && err == CHIP_NO_ERROR)
719 : {
720 : //
721 : // Only refresh the liveness check timer if we've successfully established
722 : // a subscription and have a valid value for mMaxInterval which the function
723 : // relies on.
724 : //
725 103 : mpCallback.NotifySubscriptionStillActive(*this);
726 103 : err = RefreshLivenessCheckTimer();
727 : }
728 : }
729 :
730 1922 : if (!suppressResponse && err == CHIP_NO_ERROR)
731 : {
732 1219 : bool noResponseExpected = IsSubscriptionActive() && !mPendingMoreChunks;
733 1219 : err = StatusResponse::Send(Status::Success, mExchange.Get(), !noResponseExpected);
734 : }
735 :
736 1922 : mWaitingForFirstPrimingReport = false;
737 1922 : return err;
738 1922 : }
739 :
740 12 : void ReadClient::OnResponseTimeout(Messaging::ExchangeContext * apExchangeContext)
741 : {
742 12 : ChipLogError(DataManagement, "Time out! failed to receive report data from Exchange: " ChipLogFormatExchange,
743 : ChipLogValueExchange(apExchangeContext));
744 :
745 12 : Close(CHIP_ERROR_TIMEOUT);
746 12 : }
747 :
748 5 : CHIP_ERROR ReadClient::ReadICDOperatingModeFromAttributeDataIB(TLV::TLVReader && aReader, PeerType & aType)
749 : {
750 : Clusters::IcdManagement::Attributes::OperatingMode::TypeInfo::DecodableType operatingMode;
751 :
752 5 : CHIP_ERROR err = DataModel::Decode(aReader, operatingMode);
753 5 : ReturnErrorOnFailure(err);
754 :
755 5 : switch (operatingMode)
756 : {
757 4 : case Clusters::IcdManagement::OperatingModeEnum::kSit:
758 4 : aType = PeerType::kNormal;
759 4 : break;
760 1 : case Clusters::IcdManagement::OperatingModeEnum::kLit:
761 1 : aType = PeerType::kLITICD;
762 1 : break;
763 0 : default:
764 0 : err = CHIP_ERROR_INVALID_ARGUMENT;
765 0 : break;
766 : }
767 :
768 5 : return err;
769 : }
770 :
771 4432 : CHIP_ERROR ReadClient::ProcessAttributePath(AttributePathIB::Parser & aAttributePathParser,
772 : ConcreteDataAttributePath & aAttributePath)
773 : {
774 : // The ReportData must contain a concrete attribute path. Don't validate ID
775 : // ranges here, so we can tell apart "malformed data" and "out of range
776 : // IDs".
777 4432 : CHIP_ERROR err = CHIP_NO_ERROR;
778 : // The ReportData must contain a concrete attribute path
779 4432 : err = aAttributePathParser.GetConcreteAttributePath(aAttributePath, AttributePathIB::ValidateIdRanges::kNo);
780 4432 : VerifyOrReturnError(err == CHIP_NO_ERROR, CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH_IB);
781 4430 : return CHIP_NO_ERROR;
782 : }
783 :
784 6007 : void ReadClient::NoteReportingData()
785 : {
786 6007 : if (!mIsReporting)
787 : {
788 1074 : mpCallback.OnReportBegin();
789 1074 : mIsReporting = true;
790 : }
791 6007 : }
792 :
793 1285 : CHIP_ERROR ReadClient::ProcessAttributeReportIBs(TLV::TLVReader & aAttributeReportIBsReader)
794 : {
795 1285 : CHIP_ERROR err = CHIP_NO_ERROR;
796 5715 : while (CHIP_NO_ERROR == (err = aAttributeReportIBsReader.Next()))
797 : {
798 4432 : TLV::TLVReader dataReader;
799 4432 : AttributeReportIB::Parser report;
800 4432 : AttributeDataIB::Parser data;
801 4432 : AttributeStatusIB::Parser status;
802 4432 : AttributePathIB::Parser path;
803 4432 : ConcreteDataAttributePath attributePath;
804 4432 : StatusIB statusIB;
805 :
806 4432 : TLV::TLVReader reader = aAttributeReportIBsReader;
807 4432 : ReturnErrorOnFailure(report.Init(reader));
808 :
809 4432 : err = report.GetAttributeStatus(&status);
810 4432 : if (CHIP_NO_ERROR == err)
811 : {
812 8 : StatusIB::Parser errorStatus;
813 8 : ReturnErrorOnFailure(status.GetPath(&path));
814 8 : ReturnErrorOnFailure(ProcessAttributePath(path, attributePath));
815 8 : if (!attributePath.IsValid())
816 : {
817 : // Don't fail the entire read or subscription when there is an
818 : // out-of-range ID. Just skip that one AttributeReportIB.
819 0 : ChipLogError(DataManagement,
820 : "Skipping AttributeStatusIB with out-of-range IDs: (%d, " ChipLogFormatMEI ", " ChipLogFormatMEI ") ",
821 : attributePath.mEndpointId, ChipLogValueMEI(attributePath.mClusterId),
822 : ChipLogValueMEI(attributePath.mAttributeId));
823 0 : continue;
824 : }
825 :
826 8 : ReturnErrorOnFailure(status.GetErrorStatus(&errorStatus));
827 8 : ReturnErrorOnFailure(errorStatus.DecodeStatusIB(statusIB));
828 8 : NoteReportingData();
829 8 : mpCallback.OnAttributeData(attributePath, nullptr, statusIB);
830 : }
831 4424 : else if (CHIP_END_OF_TLV == err)
832 : {
833 4424 : ReturnErrorOnFailure(report.GetAttributeData(&data));
834 4424 : ReturnErrorOnFailure(data.GetPath(&path));
835 4424 : ReturnErrorOnFailure(ProcessAttributePath(path, attributePath));
836 4422 : if (!attributePath.IsValid())
837 : {
838 : // Don't fail the entire read or subscription when there is an
839 : // out-of-range ID. Just skip that one AttributeReportIB.
840 2 : ChipLogError(DataManagement,
841 : "Skipping AttributeDataIB with out-of-range IDs: (%d, " ChipLogFormatMEI ", " ChipLogFormatMEI ") ",
842 : attributePath.mEndpointId, ChipLogValueMEI(attributePath.mClusterId),
843 : ChipLogValueMEI(attributePath.mAttributeId));
844 2 : continue;
845 : }
846 :
847 4420 : DataVersion version = 0;
848 4420 : ReturnErrorOnFailure(data.GetDataVersion(&version));
849 4420 : attributePath.mDataVersion.SetValue(version);
850 :
851 4420 : if (mReadPrepareParams.mpDataVersionFilterList != nullptr)
852 : {
853 65 : UpdateDataVersionFilters(attributePath);
854 : }
855 :
856 4420 : ReturnErrorOnFailure(data.GetData(&dataReader));
857 :
858 : // The element in an array may be another array -- so we should only set the list operation when we are handling the
859 : // whole list.
860 4420 : if (!attributePath.IsListOperation() && dataReader.GetType() == TLV::kTLVType_Array)
861 : {
862 2192 : attributePath.mListOp = ConcreteDataAttributePath::ListOperation::ReplaceAll;
863 : }
864 :
865 4420 : if (attributePath.MatchesConcreteAttributePath(ConcreteAttributePath(
866 : kRootEndpointId, Clusters::IcdManagement::Id, Clusters::IcdManagement::Attributes::OperatingMode::Id)))
867 : {
868 : PeerType peerType;
869 5 : TLV::TLVReader operatingModeTlvReader;
870 5 : operatingModeTlvReader.Init(dataReader);
871 5 : if (CHIP_NO_ERROR == ReadICDOperatingModeFromAttributeDataIB(std::move(operatingModeTlvReader), peerType))
872 : {
873 : // It is safe to call `OnPeerTypeChange` since we are in the middle of parsing the attribute data, And
874 : // the subscription should be active so `OnActiveModeNotification` is a no-op in this case.
875 5 : InteractionModelEngine::GetInstance()->OnPeerTypeChange(mPeer, peerType);
876 : }
877 : else
878 : {
879 0 : ChipLogError(DataManagement, "Failed to get ICD state from attribute data with error'%" CHIP_ERROR_FORMAT "'",
880 : err.Format());
881 : }
882 : }
883 :
884 4420 : NoteReportingData();
885 4420 : mpCallback.OnAttributeData(attributePath, &dataReader, statusIB);
886 : }
887 : }
888 :
889 1283 : if (CHIP_END_OF_TLV == err)
890 : {
891 1283 : err = CHIP_NO_ERROR;
892 : }
893 :
894 1283 : return err;
895 : }
896 :
897 640 : CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsReader)
898 : {
899 640 : CHIP_ERROR err = CHIP_NO_ERROR;
900 2219 : while (CHIP_NO_ERROR == (err = aEventReportIBsReader.Next()))
901 : {
902 1579 : TLV::TLVReader dataReader;
903 1579 : EventReportIB::Parser report;
904 1579 : EventDataIB::Parser data;
905 1579 : EventHeader header;
906 1579 : StatusIB statusIB; // Default value for statusIB is success.
907 :
908 1579 : TLV::TLVReader reader = aEventReportIBsReader;
909 1579 : ReturnErrorOnFailure(report.Init(reader));
910 :
911 1579 : err = report.GetEventData(&data);
912 :
913 1579 : if (err == CHIP_NO_ERROR)
914 : {
915 1575 : header.mTimestamp = mEventTimestamp;
916 1575 : ReturnErrorOnFailure(data.DecodeEventHeader(header));
917 1575 : mEventTimestamp = header.mTimestamp;
918 :
919 1575 : ReturnErrorOnFailure(data.GetData(&dataReader));
920 :
921 : //
922 : // Update the event number being tracked in mReadPrepareParams in case
923 : // we want to send it in the next SubscribeRequest message to convey
924 : // the event number for which we have already received an event.
925 : //
926 1575 : mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1);
927 :
928 1575 : NoteReportingData();
929 1575 : mpCallback.OnEventData(header, &dataReader, nullptr);
930 : }
931 4 : else if (err == CHIP_END_OF_TLV)
932 : {
933 4 : EventStatusIB::Parser status;
934 4 : EventPathIB::Parser pathIB;
935 4 : StatusIB::Parser statusIBParser;
936 4 : ReturnErrorOnFailure(report.GetEventStatus(&status));
937 4 : ReturnErrorOnFailure(status.GetPath(&pathIB));
938 4 : ReturnErrorOnFailure(pathIB.GetEventPath(&header.mPath));
939 4 : ReturnErrorOnFailure(status.GetErrorStatus(&statusIBParser));
940 4 : ReturnErrorOnFailure(statusIBParser.DecodeStatusIB(statusIB));
941 :
942 4 : NoteReportingData();
943 4 : mpCallback.OnEventData(header, nullptr, &statusIB);
944 : }
945 : }
946 :
947 640 : if (CHIP_END_OF_TLV == err)
948 : {
949 640 : err = CHIP_NO_ERROR;
950 : }
951 :
952 640 : return err;
953 : }
954 :
955 0 : void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout)
956 : {
957 0 : mLivenessTimeoutOverride = aLivenessTimeout;
958 0 : auto err = RefreshLivenessCheckTimer();
959 0 : if (err != CHIP_NO_ERROR)
960 : {
961 0 : Close(err);
962 : }
963 0 : }
964 :
965 383 : CHIP_ERROR ReadClient::RefreshLivenessCheckTimer()
966 : {
967 383 : CHIP_ERROR err = CHIP_NO_ERROR;
968 :
969 383 : VerifyOrReturnError(IsSubscriptionActive(), CHIP_ERROR_INCORRECT_STATE);
970 :
971 383 : CancelLivenessCheckTimer();
972 :
973 : System::Clock::Timeout timeout;
974 383 : ReturnErrorOnFailure(ComputeLivenessCheckTimerTimeout(&timeout));
975 :
976 : // EFR32/MBED/INFINION/K32W's chrono count return long unsigned, but other platform returns unsigned
977 383 : ChipLogProgress(
978 : DataManagement,
979 : "Refresh LivenessCheckTime for %lu milliseconds with SubscriptionId = 0x%08" PRIx32 " Peer = %02x:" ChipLogFormatX64,
980 : static_cast<long unsigned>(timeout.count()), mSubscriptionId, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()));
981 383 : err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
982 : timeout, OnLivenessTimeoutCallback, this);
983 :
984 383 : return err;
985 : }
986 :
987 383 : CHIP_ERROR ReadClient::ComputeLivenessCheckTimerTimeout(System::Clock::Timeout * aTimeout)
988 : {
989 383 : if (mLivenessTimeoutOverride != System::Clock::kZero)
990 : {
991 0 : *aTimeout = mLivenessTimeoutOverride;
992 0 : return CHIP_NO_ERROR;
993 : }
994 :
995 383 : VerifyOrReturnError(mReadPrepareParams.mSessionHolder, CHIP_ERROR_INCORRECT_STATE);
996 :
997 : //
998 : // To calculate the duration we're willing to wait for a report to come to us, we take into account the maximum interval of
999 : // the subscription AND the time it takes for the report to make it to us in the worst case.
1000 : //
1001 : // We have no way to estimate what the network latency will be, but we do know the other side will time out its ReportData
1002 : // after its computed round-trip timeout plus the processing time it gives us (app::kExpectedIMProcessingTime). Once it
1003 : // times out, assuming it sent the report at all, there's no point in us thinking we still have a subscription.
1004 : //
1005 : // We can't use ComputeRoundTripTimeout() on the session for two reasons: we want the roundtrip timeout from the point of
1006 : // view of the peer, not us, and we want to start off with the assumption the peer will likely have, which is that we are
1007 : // idle, whereas ComputeRoundTripTimeout() uses the current activity state of the peer.
1008 : //
1009 : // So recompute the round-trip timeout directly. Assume MRP, since in practice that is likely what is happening.
1010 383 : auto & peerMRPConfig = mReadPrepareParams.mSessionHolder->GetRemoteMRPConfig();
1011 : // Peer will assume we are idle (hence we pass kZero to GetMessageReceiptTimeout()), but will assume we treat it as active
1012 : // for the response, so to match the retransmission timeout computation for the message back to the peer, we should treat
1013 : // it as active and handling non-initial message, isFirstMessageOnExchange needs to be set as false for
1014 : // GetRetransmissionTimeout.
1015 : auto roundTripTimeout =
1016 383 : mReadPrepareParams.mSessionHolder->GetMessageReceiptTimeout(System::Clock::kZero, true /*isFirstMessageOnExchange*/) +
1017 383 : kExpectedIMProcessingTime +
1018 383 : GetRetransmissionTimeout(peerMRPConfig.mActiveRetransTimeout, peerMRPConfig.mIdleRetransTimeout,
1019 383 : System::SystemClock().GetMonotonicTimestamp(), peerMRPConfig.mActiveThresholdTime,
1020 383 : false /*isFirstMessageOnExchange*/);
1021 383 : *aTimeout = System::Clock::Seconds16(mMaxInterval) + roundTripTimeout;
1022 383 : return CHIP_NO_ERROR;
1023 : }
1024 :
1025 967 : void ReadClient::CancelLivenessCheckTimer()
1026 : {
1027 967 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
1028 : OnLivenessTimeoutCallback, this);
1029 967 : }
1030 :
1031 585 : void ReadClient::CancelResubscribeTimer()
1032 : {
1033 585 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
1034 : OnResubscribeTimerCallback, this);
1035 585 : mIsResubscriptionScheduled = false;
1036 585 : }
1037 :
1038 7 : void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState)
1039 : {
1040 7 : ReadClient * const _this = reinterpret_cast<ReadClient *>(apAppState);
1041 :
1042 : // TODO: add a more specific error here for liveness timeout failure to distinguish between other classes of timeouts (i.e
1043 : // response timeouts).
1044 7 : CHIP_ERROR subscriptionTerminationCause = CHIP_ERROR_TIMEOUT;
1045 :
1046 : //
1047 : // Might as well try to see if this instance exists in the tracked list in the IM.
1048 : // This might blow-up if either the client has since been free'ed (use-after-free), or if the engine has since
1049 : // been shutdown at which point the client wouldn't exist in the active read client list.
1050 : //
1051 7 : VerifyOrDie(_this->mpImEngine->InActiveReadClientList(_this));
1052 :
1053 7 : ChipLogError(DataManagement,
1054 : "Subscription Liveness timeout with SubscriptionID = 0x%08" PRIx32 ", Peer = %02x:" ChipLogFormatX64,
1055 : _this->mSubscriptionId, _this->GetFabricIndex(), ChipLogValueX64(_this->GetPeerNodeId()));
1056 :
1057 7 : if (_this->mIsPeerLIT)
1058 : {
1059 3 : subscriptionTerminationCause = CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT;
1060 : }
1061 :
1062 7 : _this->TriggerResubscriptionForLivenessTimeout(subscriptionTerminationCause);
1063 7 : }
1064 :
1065 8 : void ReadClient::TriggerResubscriptionForLivenessTimeout(CHIP_ERROR aReason)
1066 : {
1067 : // We didn't get a message from the server on time; it's possible that it no
1068 : // longer has a useful CASE session to us. Mark defunct all sessions that
1069 : // have not seen peer activity in at least as long as our session.
1070 8 : const auto & holder = mReadPrepareParams.mSessionHolder;
1071 8 : if (holder)
1072 : {
1073 8 : System::Clock::Timestamp lastPeerActivity = holder->AsSecureSession()->GetLastPeerActivityTime();
1074 8 : mpImEngine->GetExchangeManager()->GetSessionManager()->ForEachMatchingSession(mPeer, [&lastPeerActivity](auto * session) {
1075 8 : if (!session->IsCASESession())
1076 : {
1077 8 : return;
1078 : }
1079 :
1080 0 : if (session->GetLastPeerActivityTime() > lastPeerActivity)
1081 : {
1082 0 : return;
1083 : }
1084 :
1085 0 : session->MarkAsDefunct();
1086 : });
1087 : }
1088 :
1089 8 : Close(aReason);
1090 8 : }
1091 :
1092 282 : CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aPayload)
1093 : {
1094 282 : System::PacketBufferTLVReader reader;
1095 282 : reader.Init(std::move(aPayload));
1096 :
1097 282 : SubscribeResponseMessage::Parser subscribeResponse;
1098 282 : ReturnErrorOnFailure(subscribeResponse.Init(reader));
1099 :
1100 : #if CHIP_CONFIG_IM_PRETTY_PRINT
1101 282 : subscribeResponse.PrettyPrint();
1102 : #endif
1103 :
1104 282 : SubscriptionId subscriptionId = 0;
1105 282 : VerifyOrReturnError(subscribeResponse.GetSubscriptionId(&subscriptionId) == CHIP_NO_ERROR, CHIP_ERROR_INVALID_ARGUMENT);
1106 282 : VerifyOrReturnError(IsMatchingSubscriptionId(subscriptionId), CHIP_ERROR_INVALID_SUBSCRIPTION);
1107 280 : ReturnErrorOnFailure(subscribeResponse.GetMaxInterval(&mMaxInterval));
1108 :
1109 : #if CHIP_PROGRESS_LOGGING
1110 280 : auto duration = System::Clock::Milliseconds32(System::SystemClock().GetMonotonicTimestamp() - mSubscribeRequestTime);
1111 : #endif
1112 280 : ChipLogProgress(DataManagement,
1113 : "Subscription established in %" PRIu32 "ms with SubscriptionID = 0x%08" PRIx32 " MinInterval = %u"
1114 : "s MaxInterval = %us Peer = %02x:" ChipLogFormatX64,
1115 : duration.count(), mSubscriptionId, mMinIntervalFloorSeconds, mMaxInterval, GetFabricIndex(),
1116 : ChipLogValueX64(GetPeerNodeId()));
1117 :
1118 280 : ReturnErrorOnFailure(subscribeResponse.ExitContainer());
1119 :
1120 280 : MoveToState(ClientState::SubscriptionActive);
1121 :
1122 280 : mpCallback.OnSubscriptionEstablished(subscriptionId);
1123 :
1124 280 : mNumRetries = 0;
1125 :
1126 280 : ReturnErrorOnFailure(RefreshLivenessCheckTimer());
1127 :
1128 280 : return CHIP_NO_ERROR;
1129 282 : }
1130 :
1131 226 : CHIP_ERROR ReadClient::SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams)
1132 : {
1133 : // Make sure we don't use minimal resubscribe delays from previous attempts
1134 : // for this one.
1135 226 : mMinimalResubscribeDelay = System::Clock::kZero;
1136 :
1137 226 : mReadPrepareParams = std::move(aReadPrepareParams);
1138 226 : CHIP_ERROR err = SendSubscribeRequest(mReadPrepareParams);
1139 226 : if (err != CHIP_NO_ERROR)
1140 : {
1141 1 : StopResubscription();
1142 : }
1143 226 : return err;
1144 : }
1145 :
1146 0 : CHIP_ERROR ReadClient::SendAutoResubscribeRequest(const ScopedNodeId & aPublisherId, ReadPrepareParams && aReadPrepareParams)
1147 : {
1148 0 : mPeer = aPublisherId;
1149 0 : mReadPrepareParams = std::move(aReadPrepareParams);
1150 0 : CHIP_ERROR err = EstablishSessionToPeer();
1151 0 : if (err != CHIP_NO_ERROR)
1152 : {
1153 : // Make sure we call our callback's OnDeallocatePaths.
1154 0 : StopResubscription();
1155 : }
1156 0 : return err;
1157 : }
1158 :
1159 314 : CHIP_ERROR ReadClient::SendSubscribeRequest(const ReadPrepareParams & aReadPrepareParams)
1160 : {
1161 314 : VerifyOrReturnError(aReadPrepareParams.mMinIntervalFloorSeconds <= aReadPrepareParams.mMaxIntervalCeilingSeconds,
1162 : CHIP_ERROR_INVALID_ARGUMENT);
1163 :
1164 311 : auto err = SendSubscribeRequestImpl(aReadPrepareParams);
1165 311 : if (CHIP_NO_ERROR != err)
1166 : {
1167 : MATTER_LOG_METRIC_END(Tracing::kMetricDeviceSubscriptionSetup, err);
1168 : }
1169 311 : return err;
1170 : }
1171 :
1172 311 : CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadPrepareParams)
1173 : {
1174 : MATTER_LOG_METRIC_BEGIN(Tracing::kMetricDeviceSubscriptionSetup);
1175 :
1176 : #if CHIP_PROGRESS_LOGGING
1177 311 : mSubscribeRequestTime = System::SystemClock().GetMonotonicTimestamp();
1178 : #endif
1179 :
1180 311 : VerifyOrReturnError(ClientState::Idle == mState, CHIP_ERROR_INCORRECT_STATE);
1181 :
1182 311 : if (&aReadPrepareParams != &mReadPrepareParams)
1183 : {
1184 76 : mReadPrepareParams.mSessionHolder = aReadPrepareParams.mSessionHolder;
1185 : }
1186 :
1187 311 : mIsPeerLIT = aReadPrepareParams.mIsPeerLIT;
1188 :
1189 311 : mMinIntervalFloorSeconds = aReadPrepareParams.mMinIntervalFloorSeconds;
1190 :
1191 : // Todo: Remove the below, Update span in ReadPrepareParams
1192 311 : Span<AttributePathParams> attributePaths(aReadPrepareParams.mpAttributePathParamsList,
1193 311 : aReadPrepareParams.mAttributePathParamsListSize);
1194 311 : Span<EventPathParams> eventPaths(aReadPrepareParams.mpEventPathParamsList, aReadPrepareParams.mEventPathParamsListSize);
1195 311 : Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList,
1196 311 : aReadPrepareParams.mDataVersionFilterListSize);
1197 :
1198 311 : System::PacketBufferHandle msgBuf;
1199 311 : System::PacketBufferTLVWriter writer;
1200 311 : SubscribeRequestMessage::Builder request;
1201 311 : InitWriterWithSpaceReserved(writer, kReservedSizeForTLVEncodingOverhead);
1202 :
1203 311 : ReturnErrorOnFailure(request.Init(&writer));
1204 :
1205 311 : request.KeepSubscriptions(aReadPrepareParams.mKeepSubscriptions)
1206 311 : .MinIntervalFloorSeconds(aReadPrepareParams.mMinIntervalFloorSeconds)
1207 311 : .MaxIntervalCeilingSeconds(aReadPrepareParams.mMaxIntervalCeilingSeconds);
1208 :
1209 311 : if (!attributePaths.empty())
1210 : {
1211 304 : AttributePathIBs::Builder & attributePathListBuilder = request.CreateAttributeRequests();
1212 304 : ReturnErrorOnFailure(attributePathListBuilder.GetError());
1213 304 : ReturnErrorOnFailure(GenerateAttributePaths(attributePathListBuilder, attributePaths));
1214 : }
1215 :
1216 311 : if (!eventPaths.empty())
1217 : {
1218 22 : EventPathIBs::Builder & eventPathListBuilder = request.CreateEventRequests();
1219 22 : ReturnErrorOnFailure(eventPathListBuilder.GetError());
1220 22 : ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths));
1221 :
1222 22 : Optional<EventNumber> eventMin;
1223 22 : ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin));
1224 22 : if (eventMin.HasValue())
1225 : {
1226 0 : EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
1227 0 : ReturnErrorOnFailure(request.GetError());
1228 0 : ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value()));
1229 : }
1230 : }
1231 :
1232 311 : ReturnErrorOnFailure(request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError());
1233 :
1234 311 : bool encodedDataVersionList = false;
1235 311 : TLV::TLVWriter backup;
1236 311 : request.Checkpoint(backup);
1237 311 : DataVersionFilterIBs::Builder & dataVersionFilterListBuilder = request.CreateDataVersionFilters();
1238 311 : ReturnErrorOnFailure(request.GetError());
1239 311 : if (!attributePaths.empty())
1240 : {
1241 304 : ReturnErrorOnFailure(GenerateDataVersionFilterList(dataVersionFilterListBuilder, attributePaths, dataVersionFilters,
1242 : encodedDataVersionList));
1243 : }
1244 311 : ReturnErrorOnFailure(dataVersionFilterListBuilder.GetWriter()->UnreserveBuffer(kReservedSizeForTLVEncodingOverhead));
1245 311 : if (encodedDataVersionList)
1246 : {
1247 65 : ReturnErrorOnFailure(dataVersionFilterListBuilder.EndOfDataVersionFilterIBs());
1248 : }
1249 : else
1250 : {
1251 246 : request.Rollback(backup);
1252 : }
1253 :
1254 311 : ReturnErrorOnFailure(request.EndOfSubscribeRequestMessage());
1255 311 : ReturnErrorOnFailure(writer.Finalize(&msgBuf));
1256 :
1257 311 : VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION);
1258 :
1259 311 : auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
1260 311 : if (exchange == nullptr)
1261 : {
1262 0 : if (aReadPrepareParams.mSessionHolder->AsSecureSession()->IsActiveSession())
1263 : {
1264 0 : return CHIP_ERROR_NO_MEMORY;
1265 : }
1266 :
1267 : // Trying to subscribe with a defunct session somehow.
1268 0 : return CHIP_ERROR_INCORRECT_STATE;
1269 : }
1270 :
1271 311 : mExchange.Grab(exchange);
1272 :
1273 311 : if (aReadPrepareParams.mTimeout == System::Clock::kZero)
1274 : {
1275 311 : mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
1276 : }
1277 : else
1278 : {
1279 0 : mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout);
1280 : }
1281 :
1282 311 : ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::SubscribeRequest, std::move(msgBuf),
1283 : Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));
1284 :
1285 311 : mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer();
1286 :
1287 311 : MoveToState(ClientState::AwaitingInitialReport);
1288 :
1289 311 : return CHIP_NO_ERROR;
1290 311 : }
1291 :
1292 6 : CHIP_ERROR ReadClient::DefaultResubscribePolicy(CHIP_ERROR aTerminationCause)
1293 : {
1294 6 : if (aTerminationCause == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT)
1295 : {
1296 0 : ChipLogProgress(DataManagement, "ICD device is inactive, skipping scheduling resubscribe within DefaultResubscribePolicy");
1297 0 : return CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT;
1298 : }
1299 :
1300 6 : VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
1301 :
1302 6 : auto timeTillNextResubscription = ComputeTimeTillNextSubscription();
1303 6 : ChipLogProgress(DataManagement,
1304 : "Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32
1305 : "ms due to error %" CHIP_ERROR_FORMAT,
1306 : GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()), mNumRetries, timeTillNextResubscription,
1307 : aTerminationCause.Format());
1308 6 : return ScheduleResubscription(timeTillNextResubscription, NullOptional, aTerminationCause == CHIP_ERROR_TIMEOUT);
1309 : }
1310 :
1311 0 : void ReadClient::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
1312 : const SessionHandle & sessionHandle)
1313 : {
1314 0 : ReadClient * const _this = static_cast<ReadClient *>(context);
1315 0 : VerifyOrDie(_this != nullptr);
1316 :
1317 0 : ChipLogProgress(DataManagement, "HandleDeviceConnected");
1318 0 : _this->mReadPrepareParams.mSessionHolder.Grab(sessionHandle);
1319 0 : _this->mpExchangeMgr = &exchangeMgr;
1320 :
1321 0 : _this->mpCallback.OnCASESessionEstablished(sessionHandle, _this->mReadPrepareParams);
1322 :
1323 0 : auto err = _this->SendSubscribeRequest(_this->mReadPrepareParams);
1324 0 : if (err != CHIP_NO_ERROR)
1325 : {
1326 0 : _this->Close(err);
1327 : }
1328 0 : }
1329 :
1330 0 : void ReadClient::HandleDeviceConnectionFailure(void * context, const OperationalSessionSetup::ConnectionFailureInfo & failureInfo)
1331 : {
1332 0 : ReadClient * const _this = static_cast<ReadClient *>(context);
1333 0 : VerifyOrDie(_this != nullptr);
1334 :
1335 0 : ChipLogError(DataManagement, "Failed to establish CASE for re-subscription with error '%" CHIP_ERROR_FORMAT "'",
1336 : failureInfo.error.Format());
1337 :
1338 : #if CHIP_CONFIG_ENABLE_BUSY_HANDLING_FOR_OPERATIONAL_SESSION_SETUP
1339 : #if CHIP_DETAIL_LOGGING
1340 0 : if (failureInfo.requestedBusyDelay.HasValue())
1341 : {
1342 0 : ChipLogDetail(DataManagement, "Will delay resubscription by %u ms due to BUSY response",
1343 : failureInfo.requestedBusyDelay.Value().count());
1344 : }
1345 : #endif // CHIP_DETAIL_LOGGING
1346 0 : _this->mMinimalResubscribeDelay = failureInfo.requestedBusyDelay.ValueOr(System::Clock::kZero);
1347 : #else
1348 : _this->mMinimalResubscribeDelay = System::Clock::kZero;
1349 : #endif // CHIP_CONFIG_ENABLE_BUSY_HANDLING_FOR_OPERATIONAL_SESSION_SETUP
1350 :
1351 0 : _this->Close(failureInfo.error);
1352 0 : }
1353 :
1354 10 : void ReadClient::OnResubscribeTimerCallback(System::Layer * /* If this starts being used, fix callers that pass nullptr */,
1355 : void * apAppState)
1356 : {
1357 10 : ReadClient * const _this = static_cast<ReadClient *>(apAppState);
1358 10 : VerifyOrDie(_this != nullptr);
1359 :
1360 10 : _this->mIsResubscriptionScheduled = false;
1361 :
1362 : CHIP_ERROR err;
1363 :
1364 10 : ChipLogProgress(DataManagement, "OnResubscribeTimerCallback: ForceCASE = %d", _this->mForceCaseOnNextResub);
1365 10 : _this->mNumRetries++;
1366 :
1367 10 : bool allowResubscribeOnError = true;
1368 20 : if (!_this->mReadPrepareParams.mSessionHolder ||
1369 10 : !_this->mReadPrepareParams.mSessionHolder->AsSecureSession()->IsActiveSession())
1370 : {
1371 : // We don't have an active CASE session. We need to go ahead and set
1372 : // one up, if we can.
1373 0 : if (_this->EstablishSessionToPeer() == CHIP_NO_ERROR)
1374 : {
1375 0 : return;
1376 : }
1377 :
1378 0 : if (_this->mForceCaseOnNextResub)
1379 : {
1380 : // Caller asked us to force CASE but we have no way to do CASE.
1381 : // Just stop trying.
1382 0 : allowResubscribeOnError = false;
1383 : }
1384 :
1385 : // No way to send our subscribe request.
1386 0 : err = CHIP_ERROR_INCORRECT_STATE;
1387 0 : ExitNow();
1388 : }
1389 :
1390 10 : err = _this->SendSubscribeRequest(_this->mReadPrepareParams);
1391 :
1392 10 : exit:
1393 10 : if (err != CHIP_NO_ERROR)
1394 : {
1395 : //
1396 : // Call Close (which should trigger re-subscription again) EXCEPT if we got here because we didn't have a valid
1397 : // CASESessionManager pointer when mForceCaseOnNextResub was true.
1398 : //
1399 : // In that case, don't permit re-subscription to occur.
1400 : //
1401 0 : _this->Close(err, allowResubscribeOnError);
1402 : }
1403 : }
1404 :
1405 65 : void ReadClient::UpdateDataVersionFilters(const ConcreteDataAttributePath & aPath)
1406 : {
1407 130 : for (size_t index = 0; index < mReadPrepareParams.mDataVersionFilterListSize; index++)
1408 : {
1409 65 : if (mReadPrepareParams.mpDataVersionFilterList[index].mEndpointId == aPath.mEndpointId &&
1410 65 : mReadPrepareParams.mpDataVersionFilterList[index].mClusterId == aPath.mClusterId)
1411 : {
1412 : // Now we know the current version for this cluster is aPath.mDataVersion.
1413 65 : mReadPrepareParams.mpDataVersionFilterList[index].mDataVersion = aPath.mDataVersion;
1414 : }
1415 : }
1416 65 : }
1417 :
1418 339 : CHIP_ERROR ReadClient::GetMinEventNumber(const ReadPrepareParams & aReadPrepareParams, Optional<EventNumber> & aEventMin)
1419 : {
1420 339 : if (aReadPrepareParams.mEventNumber.HasValue())
1421 : {
1422 312 : aEventMin = aReadPrepareParams.mEventNumber;
1423 : }
1424 : else
1425 : {
1426 27 : ReturnErrorOnFailure(mpCallback.GetHighestReceivedEventNumber(aEventMin));
1427 27 : if (aEventMin.HasValue())
1428 : {
1429 : // We want to start with the first event _after_ the last one we received.
1430 2 : aEventMin.SetValue(aEventMin.Value() + 1);
1431 : }
1432 : }
1433 339 : return CHIP_NO_ERROR;
1434 : }
1435 :
1436 196 : void ReadClient::TriggerResubscribeIfScheduled(const char * reason)
1437 : {
1438 196 : if (!mIsResubscriptionScheduled)
1439 : {
1440 195 : return;
1441 : }
1442 :
1443 1 : ChipLogDetail(DataManagement, "ReadClient[%p] triggering resubscribe, reason: %s", this, reason);
1444 1 : CancelResubscribeTimer();
1445 1 : OnResubscribeTimerCallback(nullptr, this);
1446 : }
1447 :
1448 0 : Optional<System::Clock::Timeout> ReadClient::GetSubscriptionTimeout()
1449 : {
1450 0 : if (!IsSubscriptionType() || !IsSubscriptionActive())
1451 : {
1452 0 : return NullOptional;
1453 : }
1454 :
1455 : System::Clock::Timeout timeout;
1456 0 : CHIP_ERROR err = ComputeLivenessCheckTimerTimeout(&timeout);
1457 0 : if (err != CHIP_NO_ERROR)
1458 : {
1459 0 : return NullOptional;
1460 : }
1461 :
1462 0 : return MakeOptional(timeout);
1463 : }
1464 :
1465 0 : CHIP_ERROR ReadClient::EstablishSessionToPeer()
1466 : {
1467 0 : ChipLogProgress(DataManagement, "Trying to establish a CASE session for subscription");
1468 0 : auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager();
1469 0 : VerifyOrReturnError(caseSessionManager != nullptr, CHIP_ERROR_INCORRECT_STATE);
1470 0 : caseSessionManager->FindOrEstablishSession(mPeer, &mOnConnectedCallback, &mOnConnectionFailureCallback);
1471 0 : return CHIP_NO_ERROR;
1472 : }
1473 :
1474 : } // namespace app
1475 : } // namespace chip
|