Line data Source code
1 : /*
2 : *
3 : * Copyright (c) 2021 Project CHIP Authors
4 : * All rights reserved.
5 : *
6 : * Licensed under the Apache License, Version 2.0 (the "License");
7 : * you may not use this file except in compliance with the License.
8 : * You may obtain a copy of the License at
9 : *
10 : * http://www.apache.org/licenses/LICENSE-2.0
11 : *
12 : * Unless required by applicable law or agreed to in writing, software
13 : * distributed under the License is distributed on an "AS IS" BASIS,
14 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 : * See the License for the specific language governing permissions and
16 : * limitations under the License.
17 : */
18 :
19 : /**
20 : * @file
21 : * This file defines the initiator side of a CHIP Read Interaction.
22 : *
23 : */
24 :
25 : #include <app/AppConfig.h>
26 : #include <app/InteractionModelEngine.h>
27 : #include <app/InteractionModelHelper.h>
28 : #include <app/ReadClient.h>
29 : #include <app/StatusResponse.h>
30 : #include <assert.h>
31 : #include <lib/core/TLVTypes.h>
32 : #include <lib/support/FibonacciUtils.h>
33 : #include <messaging/ReliableMessageMgr.h>
34 : #include <messaging/ReliableMessageProtocolConfig.h>
35 : #include <platform/LockTracker.h>
36 : #include <tracing/metric_event.h>
37 :
38 : #include <app-common/zap-generated/cluster-objects.h>
39 : #include <app-common/zap-generated/ids/Attributes.h>
40 : #include <app-common/zap-generated/ids/Clusters.h>
41 :
42 : namespace chip {
43 : namespace app {
44 :
45 : using Status = Protocols::InteractionModel::Status;
46 :
47 1051 : ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeManager * apExchangeMgr, Callback & apCallback,
48 1051 : InteractionType aInteractionType) :
49 1051 : mExchange(*this),
50 1051 : mpCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
51 2102 : mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
52 : {
53 1051 : assertChipStackLockedByCurrentThread();
54 :
55 1051 : mpExchangeMgr = apExchangeMgr;
56 1051 : mInteractionType = aInteractionType;
57 :
58 1051 : mpImEngine = apImEngine;
59 :
60 1051 : if (aInteractionType == InteractionType::Subscribe)
61 : {
62 247 : mpImEngine->AddReadClient(this);
63 : }
64 1051 : }
65 :
66 214 : void ReadClient::ClearActiveSubscriptionState()
67 : {
68 214 : mIsReporting = false;
69 214 : mWaitingForFirstPrimingReport = true;
70 214 : mPendingMoreChunks = false;
71 214 : mMinIntervalFloorSeconds = 0;
72 214 : mMaxInterval = 0;
73 214 : mSubscriptionId = 0;
74 214 : mIsResubscriptionScheduled = false;
75 :
76 214 : MoveToState(ClientState::Idle);
77 214 : }
78 :
79 450 : void ReadClient::StopResubscription()
80 : {
81 450 : CancelLivenessCheckTimer();
82 450 : CancelResubscribeTimer();
83 :
84 : // Only deallocate the paths if they are not already deallocated.
85 450 : if (mReadPrepareParams.mpAttributePathParamsList != nullptr || mReadPrepareParams.mpEventPathParamsList != nullptr ||
86 293 : mReadPrepareParams.mpDataVersionFilterList != nullptr)
87 : {
88 157 : mpCallback.OnDeallocatePaths(std::move(mReadPrepareParams));
89 : // Make sure we will never try to free those pointers again.
90 157 : mReadPrepareParams.mpAttributePathParamsList = nullptr;
91 157 : mReadPrepareParams.mAttributePathParamsListSize = 0;
92 157 : mReadPrepareParams.mpEventPathParamsList = nullptr;
93 157 : mReadPrepareParams.mEventPathParamsListSize = 0;
94 157 : mReadPrepareParams.mpDataVersionFilterList = nullptr;
95 157 : mReadPrepareParams.mDataVersionFilterListSize = 0;
96 : }
97 450 : }
98 :
99 1139 : ReadClient::~ReadClient()
100 : {
101 1051 : assertChipStackLockedByCurrentThread();
102 :
103 1051 : if (IsSubscriptionType())
104 : {
105 247 : StopResubscription();
106 :
107 : // Only remove ourselves from the engine's tracker list if we still continue to have a valid pointer to it.
108 : // This won't be the case if the engine shut down before this destructor was called (in which case, mpImEngine
109 : // will point to null)
110 : //
111 247 : if (mpImEngine)
112 : {
113 82 : mpImEngine->RemoveReadClient(this);
114 : }
115 : }
116 1139 : }
117 :
118 10 : uint32_t ReadClient::ComputeTimeTillNextSubscription()
119 : {
120 10 : uint32_t maxWaitTimeInMsec = 0;
121 10 : uint32_t waitTimeInMsec = 0;
122 10 : uint32_t minWaitTimeInMsec = 0;
123 :
124 10 : if (mNumRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX)
125 : {
126 10 : maxWaitTimeInMsec = GetFibonacciForIndex(mNumRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS;
127 : }
128 : else
129 : {
130 0 : maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS;
131 : }
132 :
133 10 : if (maxWaitTimeInMsec != 0)
134 : {
135 3 : minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100;
136 3 : waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec));
137 : }
138 :
139 10 : if (mMinimalResubscribeDelay.count() > waitTimeInMsec)
140 : {
141 0 : waitTimeInMsec = mMinimalResubscribeDelay.count();
142 : }
143 :
144 10 : return waitTimeInMsec;
145 : }
146 :
147 10 : CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional<SessionHandle> aNewSessionHandle,
148 : bool aReestablishCASE)
149 : {
150 10 : VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
151 :
152 : //
153 : // If we're establishing CASE, make sure we are not provided a new SessionHandle as well.
154 : //
155 10 : VerifyOrReturnError(!aReestablishCASE || !aNewSessionHandle.HasValue(), CHIP_ERROR_INVALID_ARGUMENT);
156 :
157 10 : if (aNewSessionHandle.HasValue())
158 : {
159 0 : mReadPrepareParams.mSessionHolder.Grab(aNewSessionHandle.Value());
160 : }
161 :
162 10 : mForceCaseOnNextResub = aReestablishCASE;
163 10 : if (mForceCaseOnNextResub && mReadPrepareParams.mSessionHolder)
164 : {
165 : // Mark our existing session defunct, so that we will try to
166 : // re-establish it when the timer fires (unless something re-establishes
167 : // before then).
168 0 : mReadPrepareParams.mSessionHolder->AsSecureSession()->MarkAsDefunct();
169 : }
170 :
171 10 : ReturnErrorOnFailure(
172 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
173 : System::Clock::Milliseconds32(aTimeTillNextResubscriptionMs), OnResubscribeTimerCallback, this));
174 10 : mIsResubscriptionScheduled = true;
175 :
176 10 : return CHIP_NO_ERROR;
177 : }
178 :
179 954 : void ReadClient::Close(CHIP_ERROR aError, bool allowResubscription)
180 : {
181 954 : if (IsReadType())
182 : {
183 740 : if (aError != CHIP_NO_ERROR)
184 : {
185 49 : mpCallback.OnError(aError);
186 : }
187 : }
188 : else
189 : {
190 214 : if (IsAwaitingInitialReport() || IsAwaitingSubscribeResponse())
191 : {
192 : MATTER_LOG_METRIC_END(Tracing::kMetricDeviceSubscriptionSetup, aError);
193 : }
194 :
195 214 : ClearActiveSubscriptionState();
196 214 : if (aError != CHIP_NO_ERROR)
197 : {
198 : //
199 : // We infer that re-subscription was requested by virtue of having a non-zero list of event OR attribute paths present
200 : // in mReadPrepareParams. This would only be the case if an application called SendAutoResubscribeRequest which
201 : // populates mReadPrepareParams with the values provided by the application.
202 : //
203 47 : if (allowResubscription &&
204 45 : (mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0))
205 : {
206 12 : CHIP_ERROR originalReason = aError;
207 :
208 12 : aError = mpCallback.OnResubscriptionNeeded(this, aError);
209 12 : if (aError == CHIP_NO_ERROR)
210 : {
211 12 : return;
212 : }
213 2 : if (aError == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT)
214 : {
215 2 : VerifyOrDie(originalReason == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT);
216 2 : ChipLogProgress(DataManagement, "ICD device is inactive mark subscription as InactiveICDSubscription");
217 2 : MoveToState(ClientState::InactiveICDSubscription);
218 2 : return;
219 : }
220 : }
221 :
222 : //
223 : // Either something bad happened when requesting resubscription or the application has decided to not
224 : // continue by returning an error. Let's convey the error back up to the application
225 : // and shut everything down.
226 : //
227 35 : mpCallback.OnError(aError);
228 : }
229 :
230 202 : StopResubscription();
231 : }
232 :
233 942 : mExchange.Release();
234 :
235 942 : mpCallback.OnDone(this);
236 : }
237 :
238 1679 : const char * ReadClient::GetStateStr() const
239 : {
240 : #if CHIP_DETAIL_LOGGING
241 1679 : switch (mState)
242 : {
243 214 : case ClientState::Idle:
244 214 : return "Idle";
245 1039 : case ClientState::AwaitingInitialReport:
246 1039 : return "AwaitingInitialReport";
247 215 : case ClientState::AwaitingSubscribeResponse:
248 215 : return "AwaitingSubscribeResponse";
249 209 : case ClientState::SubscriptionActive:
250 209 : return "SubscriptionActive";
251 2 : case ClientState::InactiveICDSubscription:
252 2 : return "InactiveICDSubscription";
253 : }
254 : #endif // CHIP_DETAIL_LOGGING
255 0 : return "N/A";
256 : }
257 :
258 1679 : void ReadClient::MoveToState(const ClientState aTargetState)
259 : {
260 1679 : mState = aTargetState;
261 1679 : ChipLogDetail(DataManagement, "%s ReadClient[%p]: Moving to [%10.10s]", __func__, this, GetStateStr());
262 1679 : }
263 :
264 868 : CHIP_ERROR ReadClient::SendRequest(ReadPrepareParams & aReadPrepareParams)
265 : {
266 868 : if (mInteractionType == InteractionType::Read)
267 : {
268 792 : return SendReadRequest(aReadPrepareParams);
269 : }
270 :
271 76 : if (mInteractionType == InteractionType::Subscribe)
272 : {
273 76 : return SendSubscribeRequest(aReadPrepareParams);
274 : }
275 :
276 0 : return CHIP_ERROR_INVALID_ARGUMENT;
277 : }
278 :
279 792 : CHIP_ERROR ReadClient::SendReadRequest(ReadPrepareParams & aReadPrepareParams)
280 : {
281 792 : CHIP_ERROR err = CHIP_NO_ERROR;
282 :
283 792 : ChipLogDetail(DataManagement, "%s ReadClient[%p]: Sending Read Request", __func__, this);
284 :
285 792 : VerifyOrReturnError(ClientState::Idle == mState, err = CHIP_ERROR_INCORRECT_STATE);
286 :
287 : Span<AttributePathParams> attributePaths(aReadPrepareParams.mpAttributePathParamsList,
288 792 : aReadPrepareParams.mAttributePathParamsListSize);
289 792 : Span<EventPathParams> eventPaths(aReadPrepareParams.mpEventPathParamsList, aReadPrepareParams.mEventPathParamsListSize);
290 : Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList,
291 792 : aReadPrepareParams.mDataVersionFilterListSize);
292 :
293 792 : System::PacketBufferHandle msgBuf;
294 792 : ReadRequestMessage::Builder request;
295 792 : System::PacketBufferTLVWriter writer;
296 :
297 792 : InitWriterWithSpaceReserved(writer, kReservedSizeForTLVEncodingOverhead);
298 792 : ReturnErrorOnFailure(request.Init(&writer));
299 :
300 792 : if (!attributePaths.empty())
301 : {
302 673 : AttributePathIBs::Builder & attributePathListBuilder = request.CreateAttributeRequests();
303 673 : ReturnErrorOnFailure(err = request.GetError());
304 673 : ReturnErrorOnFailure(GenerateAttributePaths(attributePathListBuilder, attributePaths));
305 : }
306 :
307 792 : if (!eventPaths.empty())
308 : {
309 317 : EventPathIBs::Builder & eventPathListBuilder = request.CreateEventRequests();
310 317 : ReturnErrorOnFailure(err = request.GetError());
311 :
312 317 : ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths));
313 :
314 317 : Optional<EventNumber> eventMin;
315 317 : ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin));
316 317 : if (eventMin.HasValue())
317 : {
318 314 : EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
319 314 : ReturnErrorOnFailure(err = request.GetError());
320 314 : ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value()));
321 : }
322 317 : }
323 :
324 792 : ReturnErrorOnFailure(request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError());
325 :
326 792 : bool encodedDataVersionList = false;
327 792 : TLV::TLVWriter backup;
328 792 : request.Checkpoint(backup);
329 792 : DataVersionFilterIBs::Builder & dataVersionFilterListBuilder = request.CreateDataVersionFilters();
330 792 : ReturnErrorOnFailure(request.GetError());
331 792 : if (!attributePaths.empty())
332 : {
333 673 : ReturnErrorOnFailure(GenerateDataVersionFilterList(dataVersionFilterListBuilder, attributePaths, dataVersionFilters,
334 : encodedDataVersionList));
335 : }
336 792 : ReturnErrorOnFailure(dataVersionFilterListBuilder.GetWriter()->UnreserveBuffer(kReservedSizeForTLVEncodingOverhead));
337 792 : if (encodedDataVersionList)
338 : {
339 80 : ReturnErrorOnFailure(dataVersionFilterListBuilder.EndOfDataVersionFilterIBs());
340 : }
341 : else
342 : {
343 712 : request.Rollback(backup);
344 : }
345 :
346 792 : ReturnErrorOnFailure(request.EndOfReadRequestMessage());
347 792 : ReturnErrorOnFailure(writer.Finalize(&msgBuf));
348 :
349 792 : VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION);
350 :
351 792 : auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
352 792 : VerifyOrReturnError(exchange != nullptr, err = CHIP_ERROR_NO_MEMORY);
353 :
354 792 : mExchange.Grab(exchange);
355 :
356 792 : if (aReadPrepareParams.mTimeout == System::Clock::kZero)
357 : {
358 792 : mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
359 : }
360 : else
361 : {
362 0 : mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout);
363 : }
364 :
365 792 : ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::ReadRequest, std::move(msgBuf),
366 : Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));
367 :
368 792 : mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer();
369 792 : MoveToState(ClientState::AwaitingInitialReport);
370 :
371 792 : return CHIP_NO_ERROR;
372 792 : }
373 :
374 343 : CHIP_ERROR ReadClient::GenerateEventPaths(EventPathIBs::Builder & aEventPathsBuilder, const Span<EventPathParams> & aEventPaths)
375 : {
376 854 : for (auto & event : aEventPaths)
377 : {
378 511 : VerifyOrReturnError(event.IsValidEventPath(), CHIP_ERROR_IM_MALFORMED_EVENT_PATH_IB);
379 511 : EventPathIB::Builder & path = aEventPathsBuilder.CreatePath();
380 511 : ReturnErrorOnFailure(aEventPathsBuilder.GetError());
381 511 : ReturnErrorOnFailure(path.Encode(event));
382 : }
383 :
384 343 : return aEventPathsBuilder.EndOfEventPaths();
385 : }
386 :
387 909 : CHIP_ERROR ReadClient::GenerateAttributePaths(AttributePathIBs::Builder & aAttributePathIBsBuilder,
388 : const Span<AttributePathParams> & aAttributePaths)
389 : {
390 2832 : for (auto & attribute : aAttributePaths)
391 : {
392 1925 : VerifyOrReturnError(attribute.IsValidAttributePath(), CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH_IB);
393 1923 : AttributePathIB::Builder & path = aAttributePathIBsBuilder.CreatePath();
394 1923 : ReturnErrorOnFailure(aAttributePathIBsBuilder.GetError());
395 1923 : ReturnErrorOnFailure(path.Encode(attribute));
396 : }
397 :
398 907 : return aAttributePathIBsBuilder.EndOfAttributePathIBs();
399 : }
400 :
401 900 : CHIP_ERROR ReadClient::BuildDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
402 : const Span<AttributePathParams> & aAttributePaths,
403 : const Span<DataVersionFilter> & aDataVersionFilters,
404 : bool & aEncodedDataVersionList)
405 : {
406 : #if CHIP_PROGRESS_LOGGING
407 900 : size_t encodedFilterCount = 0;
408 900 : size_t irrelevantFilterCount = 0;
409 900 : size_t skippedFilterCount = 0;
410 : #endif
411 3393 : for (auto & filter : aDataVersionFilters)
412 : {
413 2494 : VerifyOrReturnError(filter.IsValidDataVersionFilter(), CHIP_ERROR_INVALID_ARGUMENT);
414 :
415 : // If data version filter is for some cluster none of whose attributes are included in our paths, discard this filter.
416 2494 : bool intersected = false;
417 2506 : for (auto & path : aAttributePaths)
418 : {
419 2500 : if (path.IncludesAttributesInCluster(filter))
420 : {
421 2488 : intersected = true;
422 2488 : break;
423 : }
424 : }
425 :
426 2494 : if (!intersected)
427 : {
428 : #if CHIP_PROGRESS_LOGGING
429 6 : ++irrelevantFilterCount;
430 : #endif
431 6 : continue;
432 : }
433 :
434 2488 : TLV::TLVWriter backup;
435 2488 : aDataVersionFilterIBsBuilder.Checkpoint(backup);
436 2488 : CHIP_ERROR err = aDataVersionFilterIBsBuilder.EncodeDataVersionFilterIB(filter);
437 2488 : if (err == CHIP_NO_ERROR)
438 : {
439 : #if CHIP_PROGRESS_LOGGING
440 2487 : ++encodedFilterCount;
441 : #endif
442 2487 : aEncodedDataVersionList = true;
443 : }
444 1 : else if (err == CHIP_ERROR_NO_MEMORY || err == CHIP_ERROR_BUFFER_TOO_SMALL)
445 : {
446 : // Packet is full, ignore the rest of the list
447 1 : aDataVersionFilterIBsBuilder.Rollback(backup);
448 : #if CHIP_PROGRESS_LOGGING
449 1 : ssize_t nonSkippedFilterCount = &filter - aDataVersionFilters.data();
450 1 : skippedFilterCount = aDataVersionFilters.size() - static_cast<size_t>(nonSkippedFilterCount);
451 : #endif // CHIP_PROGRESS_LOGGING
452 1 : break;
453 : }
454 : else
455 : {
456 0 : return err;
457 : }
458 : }
459 :
460 900 : ChipLogProgress(DataManagement,
461 : "%lu data version filters provided, %lu not relevant, %lu encoded, %lu skipped due to lack of space",
462 : static_cast<unsigned long>(aDataVersionFilters.size()), static_cast<unsigned long>(irrelevantFilterCount),
463 : static_cast<unsigned long>(encodedFilterCount), static_cast<unsigned long>(skippedFilterCount));
464 900 : return CHIP_NO_ERROR;
465 : }
466 :
467 905 : CHIP_ERROR ReadClient::GenerateDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
468 : const Span<AttributePathParams> & aAttributePaths,
469 : const Span<DataVersionFilter> & aDataVersionFilters,
470 : bool & aEncodedDataVersionList)
471 : {
472 : // Give the callback a chance first, otherwise use the list we have, if any.
473 905 : ReturnErrorOnFailure(
474 : mpCallback.OnUpdateDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aEncodedDataVersionList));
475 :
476 905 : if (!aEncodedDataVersionList)
477 : {
478 900 : ReturnErrorOnFailure(BuildDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aDataVersionFilters,
479 : aEncodedDataVersionList));
480 : }
481 :
482 905 : return CHIP_NO_ERROR;
483 : }
484 :
485 3 : void ReadClient::OnActiveModeNotification()
486 : {
487 : // This function just tries to complete the deferred resubscription logic in `OnLivenessTimeoutCallback`.
488 3 : VerifyOrDie(mpImEngine->InActiveReadClientList(this));
489 : // If we are not in InactiveICDSubscription state, that means the liveness timeout has not been reached. Simply do nothing.
490 3 : VerifyOrReturn(IsInactiveICDSubscription());
491 :
492 : // When we reach here, the subscription definitely exceeded the liveness timeout. Just continue the unfinished resubscription
493 : // logic in `OnLivenessTimeoutCallback`.
494 1 : TriggerResubscriptionForLivenessTimeout(CHIP_ERROR_TIMEOUT);
495 : }
496 :
497 3 : void ReadClient::OnPeerTypeChange(PeerType aType)
498 : {
499 3 : VerifyOrDie(mpImEngine->InActiveReadClientList(this));
500 :
501 3 : mIsPeerLIT = (aType == PeerType::kLITICD);
502 :
503 3 : ChipLogProgress(DataManagement, "Peer is now %s LIT ICD.", mIsPeerLIT ? "a" : "not a");
504 :
505 : // If the peer is no longer LIT, try to wake up the subscription and do resubscribe when necessary.
506 3 : if (!mIsPeerLIT)
507 : {
508 2 : OnActiveModeNotification();
509 : }
510 3 : }
511 :
512 2040 : CHIP_ERROR ReadClient::OnMessageReceived(Messaging::ExchangeContext * apExchangeContext, const PayloadHeader & aPayloadHeader,
513 : System::PacketBufferHandle && aPayload)
514 : {
515 2040 : CHIP_ERROR err = CHIP_NO_ERROR;
516 2040 : Status status = Status::InvalidAction;
517 2040 : VerifyOrExit(!IsIdle() && !IsInactiveICDSubscription(), err = CHIP_ERROR_INCORRECT_STATE);
518 :
519 2040 : if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::ReportData))
520 : {
521 1759 : err = ProcessReportData(std::move(aPayload), ReportType::kContinuingTransaction);
522 : }
523 281 : else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::SubscribeResponse))
524 : {
525 211 : ChipLogProgress(DataManagement, "SubscribeResponse is received");
526 211 : VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE);
527 211 : err = ProcessSubscribeResponse(std::move(aPayload));
528 : MATTER_LOG_METRIC_END(Tracing::kMetricDeviceSubscriptionSetup, err);
529 : }
530 70 : else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::StatusResponse))
531 : {
532 138 : VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE);
533 70 : CHIP_ERROR statusError = CHIP_NO_ERROR;
534 70 : SuccessOrExit(err = StatusResponse::ProcessStatusResponse(std::move(aPayload), statusError));
535 70 : SuccessOrExit(err = statusError);
536 2 : err = CHIP_ERROR_INVALID_MESSAGE_TYPE;
537 : }
538 : else
539 : {
540 0 : err = CHIP_ERROR_INVALID_MESSAGE_TYPE;
541 : }
542 :
543 2040 : exit:
544 2040 : if (err != CHIP_NO_ERROR)
545 : {
546 76 : if (err == CHIP_ERROR_INVALID_SUBSCRIPTION)
547 : {
548 4 : status = Status::InvalidSubscription;
549 : }
550 76 : StatusResponse::Send(status, apExchangeContext, false /*aExpectResponse*/);
551 : }
552 :
553 2040 : if ((!IsSubscriptionType() && !mPendingMoreChunks) || err != CHIP_NO_ERROR)
554 : {
555 767 : Close(err);
556 : }
557 :
558 2040 : return err;
559 : }
560 :
561 82 : void ReadClient::OnUnsolicitedReportData(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload)
562 : {
563 82 : Status status = Status::Success;
564 82 : mExchange.Grab(apExchangeContext);
565 :
566 : //
567 : // Let's update the session we're tracking in our SessionHolder to that associated with the message that was just received.
568 : // This CAN be different from the one we were tracking before, since the server is permitted to send exchanges on any valid
569 : // session to us, of which there could be multiple.
570 : //
571 : // Since receipt of a message is proof of a working session on the peer, it's always best to update to that if possible
572 : // to maximize our chances of success later.
573 : //
574 82 : mReadPrepareParams.mSessionHolder.Grab(mExchange->GetSessionHandle());
575 :
576 82 : CHIP_ERROR err = ProcessReportData(std::move(aPayload), ReportType::kUnsolicited);
577 82 : if (err != CHIP_NO_ERROR)
578 : {
579 0 : if (err == CHIP_ERROR_INVALID_SUBSCRIPTION)
580 : {
581 0 : status = Status::InvalidSubscription;
582 : }
583 : else
584 : {
585 0 : status = Status::InvalidAction;
586 : }
587 :
588 0 : StatusResponse::Send(status, mExchange.Get(), false /*aExpectResponse*/);
589 0 : Close(err);
590 : }
591 82 : }
592 :
593 1849 : CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload, ReportType aReportType)
594 : {
595 1849 : CHIP_ERROR err = CHIP_NO_ERROR;
596 1849 : ReportDataMessage::Parser report;
597 1849 : bool suppressResponse = true;
598 1849 : SubscriptionId subscriptionId = 0;
599 1849 : EventReportIBs::Parser eventReportIBs;
600 1849 : AttributeReportIBs::Parser attributeReportIBs;
601 1849 : System::PacketBufferTLVReader reader;
602 1849 : reader.Init(std::move(aPayload));
603 1849 : err = report.Init(reader);
604 1849 : SuccessOrExit(err);
605 :
606 : #if CHIP_CONFIG_IM_PRETTY_PRINT
607 1847 : if (aReportType != ReportType::kUnsolicited)
608 : {
609 1765 : report.PrettyPrint();
610 : }
611 : #endif
612 :
613 1847 : err = report.GetSuppressResponse(&suppressResponse);
614 1847 : if (CHIP_END_OF_TLV == err)
615 : {
616 1148 : suppressResponse = false;
617 1148 : err = CHIP_NO_ERROR;
618 : }
619 1847 : SuccessOrExit(err);
620 :
621 1847 : err = report.GetSubscriptionId(&subscriptionId);
622 1847 : if (CHIP_NO_ERROR == err)
623 : {
624 345 : VerifyOrExit(IsSubscriptionType(), err = CHIP_ERROR_INVALID_ARGUMENT);
625 343 : if (mWaitingForFirstPrimingReport)
626 : {
627 213 : mSubscriptionId = subscriptionId;
628 : }
629 130 : else if (!IsMatchingSubscriptionId(subscriptionId))
630 : {
631 2 : err = CHIP_ERROR_INVALID_SUBSCRIPTION;
632 : }
633 : }
634 1502 : else if (CHIP_END_OF_TLV == err)
635 : {
636 1502 : if (IsSubscriptionType())
637 : {
638 0 : err = CHIP_ERROR_INVALID_ARGUMENT;
639 : }
640 : else
641 : {
642 1502 : err = CHIP_NO_ERROR;
643 : }
644 : }
645 1845 : SuccessOrExit(err);
646 :
647 1843 : err = report.GetMoreChunkedMessages(&mPendingMoreChunks);
648 1843 : if (CHIP_END_OF_TLV == err)
649 : {
650 980 : mPendingMoreChunks = false;
651 980 : err = CHIP_NO_ERROR;
652 : }
653 1843 : SuccessOrExit(err);
654 :
655 1843 : err = report.GetEventReports(&eventReportIBs);
656 1843 : if (err == CHIP_END_OF_TLV)
657 : {
658 1203 : err = CHIP_NO_ERROR;
659 : }
660 640 : else if (err == CHIP_NO_ERROR)
661 : {
662 640 : chip::TLV::TLVReader EventReportsReader;
663 640 : eventReportIBs.GetReader(&EventReportsReader);
664 640 : err = ProcessEventReportIBs(EventReportsReader);
665 : }
666 1843 : SuccessOrExit(err);
667 :
668 1843 : err = report.GetAttributeReportIBs(&attributeReportIBs);
669 1843 : if (err == CHIP_END_OF_TLV)
670 : {
671 631 : err = CHIP_NO_ERROR;
672 : }
673 1212 : else if (err == CHIP_NO_ERROR)
674 : {
675 1212 : TLV::TLVReader attributeReportIBsReader;
676 1212 : attributeReportIBs.GetReader(&attributeReportIBsReader);
677 1212 : err = ProcessAttributeReportIBs(attributeReportIBsReader);
678 : }
679 1843 : SuccessOrExit(err);
680 :
681 1841 : if (mIsReporting && !mPendingMoreChunks)
682 : {
683 949 : mpCallback.OnReportEnd();
684 949 : mIsReporting = false;
685 : }
686 :
687 1841 : SuccessOrExit(err = report.ExitContainer());
688 :
689 1849 : exit:
690 1849 : if (IsSubscriptionType())
691 : {
692 345 : if (IsAwaitingInitialReport())
693 : {
694 215 : MoveToState(ClientState::AwaitingSubscribeResponse);
695 : }
696 130 : else if (IsSubscriptionActive() && err == CHIP_NO_ERROR)
697 : {
698 : //
699 : // Only refresh the liveness check timer if we've successfully established
700 : // a subscription and have a valid value for mMaxInterval which the function
701 : // relies on.
702 : //
703 103 : mpCallback.NotifySubscriptionStillActive(*this);
704 103 : err = RefreshLivenessCheckTimer();
705 : }
706 : }
707 :
708 1849 : if (!suppressResponse && err == CHIP_NO_ERROR)
709 : {
710 1146 : bool noResponseExpected = IsSubscriptionActive() && !mPendingMoreChunks;
711 1146 : err = StatusResponse::Send(Status::Success, mExchange.Get(), !noResponseExpected);
712 : }
713 :
714 1849 : mWaitingForFirstPrimingReport = false;
715 1849 : return err;
716 1849 : }
717 :
718 11 : void ReadClient::OnResponseTimeout(Messaging::ExchangeContext * apExchangeContext)
719 : {
720 11 : ChipLogError(DataManagement, "Time out! failed to receive report data from Exchange: " ChipLogFormatExchange,
721 : ChipLogValueExchange(apExchangeContext));
722 :
723 11 : Close(CHIP_ERROR_TIMEOUT);
724 11 : }
725 :
726 5 : CHIP_ERROR ReadClient::ReadICDOperatingModeFromAttributeDataIB(TLV::TLVReader && aReader, PeerType & aType)
727 : {
728 : Clusters::IcdManagement::Attributes::OperatingMode::TypeInfo::DecodableType operatingMode;
729 :
730 5 : CHIP_ERROR err = DataModel::Decode(aReader, operatingMode);
731 5 : ReturnErrorOnFailure(err);
732 :
733 5 : switch (operatingMode)
734 : {
735 4 : case Clusters::IcdManagement::OperatingModeEnum::kSit:
736 4 : aType = PeerType::kNormal;
737 4 : break;
738 1 : case Clusters::IcdManagement::OperatingModeEnum::kLit:
739 1 : aType = PeerType::kLITICD;
740 1 : break;
741 0 : default:
742 0 : err = CHIP_ERROR_INVALID_ARGUMENT;
743 0 : break;
744 : }
745 :
746 5 : return err;
747 : }
748 :
749 4357 : CHIP_ERROR ReadClient::ProcessAttributePath(AttributePathIB::Parser & aAttributePathParser,
750 : ConcreteDataAttributePath & aAttributePath)
751 : {
752 : // The ReportData must contain a concrete attribute path. Don't validate ID
753 : // ranges here, so we can tell apart "malformed data" and "out of range
754 : // IDs".
755 4357 : CHIP_ERROR err = CHIP_NO_ERROR;
756 : // The ReportData must contain a concrete attribute path
757 4357 : err = aAttributePathParser.GetConcreteAttributePath(aAttributePath, AttributePathIB::ValidateIdRanges::kNo);
758 4357 : VerifyOrReturnError(err == CHIP_NO_ERROR, CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH_IB);
759 4355 : return CHIP_NO_ERROR;
760 : }
761 :
762 5932 : void ReadClient::NoteReportingData()
763 : {
764 5932 : if (!mIsReporting)
765 : {
766 1003 : mpCallback.OnReportBegin();
767 1003 : mIsReporting = true;
768 : }
769 5932 : }
770 :
771 1212 : CHIP_ERROR ReadClient::ProcessAttributeReportIBs(TLV::TLVReader & aAttributeReportIBsReader)
772 : {
773 1212 : CHIP_ERROR err = CHIP_NO_ERROR;
774 5567 : while (CHIP_NO_ERROR == (err = aAttributeReportIBsReader.Next()))
775 : {
776 4357 : TLV::TLVReader dataReader;
777 4357 : AttributeReportIB::Parser report;
778 4357 : AttributeDataIB::Parser data;
779 4357 : AttributeStatusIB::Parser status;
780 4357 : AttributePathIB::Parser path;
781 4357 : ConcreteDataAttributePath attributePath;
782 4357 : StatusIB statusIB;
783 :
784 4357 : TLV::TLVReader reader = aAttributeReportIBsReader;
785 4357 : ReturnErrorOnFailure(report.Init(reader));
786 :
787 4357 : err = report.GetAttributeStatus(&status);
788 4357 : if (CHIP_NO_ERROR == err)
789 : {
790 257 : StatusIB::Parser errorStatus;
791 257 : ReturnErrorOnFailure(status.GetPath(&path));
792 257 : ReturnErrorOnFailure(ProcessAttributePath(path, attributePath));
793 257 : if (!attributePath.IsValid())
794 : {
795 : // Don't fail the entire read or subscription when there is an
796 : // out-of-range ID. Just skip that one AttributeReportIB.
797 0 : ChipLogError(DataManagement,
798 : "Skipping AttributeStatusIB with out-of-range IDs: (%d, " ChipLogFormatMEI ", " ChipLogFormatMEI ") ",
799 : attributePath.mEndpointId, ChipLogValueMEI(attributePath.mClusterId),
800 : ChipLogValueMEI(attributePath.mAttributeId));
801 0 : continue;
802 : }
803 :
804 257 : ReturnErrorOnFailure(status.GetErrorStatus(&errorStatus));
805 257 : ReturnErrorOnFailure(errorStatus.DecodeStatusIB(statusIB));
806 257 : NoteReportingData();
807 257 : mpCallback.OnAttributeData(attributePath, nullptr, statusIB);
808 : }
809 4100 : else if (CHIP_END_OF_TLV == err)
810 : {
811 4100 : ReturnErrorOnFailure(report.GetAttributeData(&data));
812 4100 : ReturnErrorOnFailure(data.GetPath(&path));
813 4100 : ReturnErrorOnFailure(ProcessAttributePath(path, attributePath));
814 4098 : if (!attributePath.IsValid())
815 : {
816 : // Don't fail the entire read or subscription when there is an
817 : // out-of-range ID. Just skip that one AttributeReportIB.
818 2 : ChipLogError(DataManagement,
819 : "Skipping AttributeDataIB with out-of-range IDs: (%d, " ChipLogFormatMEI ", " ChipLogFormatMEI ") ",
820 : attributePath.mEndpointId, ChipLogValueMEI(attributePath.mClusterId),
821 : ChipLogValueMEI(attributePath.mAttributeId));
822 2 : continue;
823 : }
824 :
825 4096 : DataVersion version = 0;
826 4096 : ReturnErrorOnFailure(data.GetDataVersion(&version));
827 4096 : attributePath.mDataVersion.SetValue(version);
828 :
829 4096 : if (mReadPrepareParams.mpDataVersionFilterList != nullptr)
830 : {
831 65 : UpdateDataVersionFilters(attributePath);
832 : }
833 :
834 4096 : ReturnErrorOnFailure(data.GetData(&dataReader));
835 :
836 : // The element in an array may be another array -- so we should only set the list operation when we are handling the
837 : // whole list.
838 4096 : if (!attributePath.IsListOperation() && dataReader.GetType() == TLV::kTLVType_Array)
839 : {
840 1872 : attributePath.mListOp = ConcreteDataAttributePath::ListOperation::ReplaceAll;
841 : }
842 :
843 4096 : if (attributePath.MatchesConcreteAttributePath(ConcreteAttributePath(
844 : kRootEndpointId, Clusters::IcdManagement::Id, Clusters::IcdManagement::Attributes::OperatingMode::Id)))
845 : {
846 : PeerType peerType;
847 5 : TLV::TLVReader operatingModeTlvReader;
848 5 : operatingModeTlvReader.Init(dataReader);
849 5 : if (CHIP_NO_ERROR == ReadICDOperatingModeFromAttributeDataIB(std::move(operatingModeTlvReader), peerType))
850 : {
851 : // It is safe to call `OnPeerTypeChange` since we are in the middle of parsing the attribute data, And
852 : // the subscription should be active so `OnActiveModeNotification` is a no-op in this case.
853 5 : InteractionModelEngine::GetInstance()->OnPeerTypeChange(mPeer, peerType);
854 : }
855 : else
856 : {
857 0 : ChipLogError(DataManagement, "Failed to get ICD state from attribute data with error'%" CHIP_ERROR_FORMAT "'",
858 : err.Format());
859 : }
860 : }
861 :
862 4096 : NoteReportingData();
863 4096 : mpCallback.OnAttributeData(attributePath, &dataReader, statusIB);
864 : }
865 4361 : }
866 :
867 1210 : if (CHIP_END_OF_TLV == err)
868 : {
869 1210 : err = CHIP_NO_ERROR;
870 : }
871 :
872 1210 : return err;
873 : }
874 :
875 640 : CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsReader)
876 : {
877 640 : CHIP_ERROR err = CHIP_NO_ERROR;
878 2219 : while (CHIP_NO_ERROR == (err = aEventReportIBsReader.Next()))
879 : {
880 1579 : TLV::TLVReader dataReader;
881 1579 : EventReportIB::Parser report;
882 1579 : EventDataIB::Parser data;
883 1579 : EventHeader header;
884 1579 : StatusIB statusIB; // Default value for statusIB is success.
885 :
886 1579 : TLV::TLVReader reader = aEventReportIBsReader;
887 1579 : ReturnErrorOnFailure(report.Init(reader));
888 :
889 1579 : err = report.GetEventData(&data);
890 :
891 1579 : if (err == CHIP_NO_ERROR)
892 : {
893 1575 : header.mTimestamp = mEventTimestamp;
894 1575 : ReturnErrorOnFailure(data.DecodeEventHeader(header));
895 1575 : mEventTimestamp = header.mTimestamp;
896 :
897 1575 : ReturnErrorOnFailure(data.GetData(&dataReader));
898 :
899 : //
900 : // Update the event number being tracked in mReadPrepareParams in case
901 : // we want to send it in the next SubscribeRequest message to convey
902 : // the event number for which we have already received an event.
903 : //
904 1575 : mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1);
905 :
906 1575 : NoteReportingData();
907 1575 : mpCallback.OnEventData(header, &dataReader, nullptr);
908 : }
909 4 : else if (err == CHIP_END_OF_TLV)
910 : {
911 4 : EventStatusIB::Parser status;
912 4 : EventPathIB::Parser pathIB;
913 4 : StatusIB::Parser statusIBParser;
914 4 : ReturnErrorOnFailure(report.GetEventStatus(&status));
915 4 : ReturnErrorOnFailure(status.GetPath(&pathIB));
916 4 : ReturnErrorOnFailure(pathIB.GetEventPath(&header.mPath));
917 4 : ReturnErrorOnFailure(status.GetErrorStatus(&statusIBParser));
918 4 : ReturnErrorOnFailure(statusIBParser.DecodeStatusIB(statusIB));
919 :
920 4 : NoteReportingData();
921 4 : mpCallback.OnEventData(header, nullptr, &statusIB);
922 : }
923 1579 : }
924 :
925 640 : if (CHIP_END_OF_TLV == err)
926 : {
927 640 : err = CHIP_NO_ERROR;
928 : }
929 :
930 640 : return err;
931 : }
932 :
933 0 : void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout)
934 : {
935 0 : mLivenessTimeoutOverride = aLivenessTimeout;
936 0 : auto err = RefreshLivenessCheckTimer();
937 0 : if (err != CHIP_NO_ERROR)
938 : {
939 0 : Close(err);
940 : }
941 0 : }
942 :
943 312 : CHIP_ERROR ReadClient::RefreshLivenessCheckTimer()
944 : {
945 312 : CHIP_ERROR err = CHIP_NO_ERROR;
946 :
947 312 : VerifyOrReturnError(IsSubscriptionActive(), CHIP_ERROR_INCORRECT_STATE);
948 :
949 312 : CancelLivenessCheckTimer();
950 :
951 : System::Clock::Timeout timeout;
952 312 : ReturnErrorOnFailure(ComputeLivenessCheckTimerTimeout(&timeout));
953 :
954 : // EFR32/MBED/INFINION/K32W's chrono count return long unsigned, but other platform returns unsigned
955 312 : ChipLogProgress(
956 : DataManagement,
957 : "Refresh LivenessCheckTime for %lu milliseconds with SubscriptionId = 0x%08" PRIx32 " Peer = %02x:" ChipLogFormatX64,
958 : static_cast<long unsigned>(timeout.count()), mSubscriptionId, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()));
959 312 : err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
960 : timeout, OnLivenessTimeoutCallback, this);
961 :
962 312 : return err;
963 : }
964 :
965 312 : CHIP_ERROR ReadClient::ComputeLivenessCheckTimerTimeout(System::Clock::Timeout * aTimeout)
966 : {
967 312 : if (mLivenessTimeoutOverride != System::Clock::kZero)
968 : {
969 0 : *aTimeout = mLivenessTimeoutOverride;
970 0 : return CHIP_NO_ERROR;
971 : }
972 :
973 312 : VerifyOrReturnError(mReadPrepareParams.mSessionHolder, CHIP_ERROR_INCORRECT_STATE);
974 :
975 : //
976 : // To calculate the duration we're willing to wait for a report to come to us, we take into account the maximum interval of
977 : // the subscription AND the time it takes for the report to make it to us in the worst case.
978 : //
979 : // We have no way to estimate what the network latency will be, but we do know the other side will time out its ReportData
980 : // after its computed round-trip timeout plus the processing time it gives us (app::kExpectedIMProcessingTime). Once it
981 : // times out, assuming it sent the report at all, there's no point in us thinking we still have a subscription.
982 : //
983 : // We can't use ComputeRoundTripTimeout() on the session for two reasons: we want the roundtrip timeout from the point of
984 : // view of the peer, not us, and we want to start off with the assumption the peer will likely have, which is that we are
985 : // idle, whereas ComputeRoundTripTimeout() uses the current activity state of the peer.
986 : //
987 : // So recompute the round-trip timeout directly. Assume MRP, since in practice that is likely what is happening.
988 312 : auto & peerMRPConfig = mReadPrepareParams.mSessionHolder->GetRemoteMRPConfig();
989 : // Peer will assume we are idle (hence we pass kZero to GetMessageReceiptTimeout()), but will assume we treat it as active
990 : // for the response, so to match the retransmission timeout computation for the message back to the peeer, we should treat
991 : // it as active.
992 312 : auto roundTripTimeout = mReadPrepareParams.mSessionHolder->GetMessageReceiptTimeout(System::Clock::kZero) +
993 312 : kExpectedIMProcessingTime +
994 312 : GetRetransmissionTimeout(peerMRPConfig.mActiveRetransTimeout, peerMRPConfig.mIdleRetransTimeout,
995 624 : System::SystemClock().GetMonotonicTimestamp(), peerMRPConfig.mActiveThresholdTime);
996 312 : *aTimeout = System::Clock::Seconds16(mMaxInterval) + roundTripTimeout;
997 312 : return CHIP_NO_ERROR;
998 : }
999 :
1000 762 : void ReadClient::CancelLivenessCheckTimer()
1001 : {
1002 762 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
1003 : OnLivenessTimeoutCallback, this);
1004 762 : }
1005 :
1006 450 : void ReadClient::CancelResubscribeTimer()
1007 : {
1008 450 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
1009 : OnResubscribeTimerCallback, this);
1010 450 : mIsResubscriptionScheduled = false;
1011 450 : }
1012 :
1013 6 : void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState)
1014 : {
1015 6 : ReadClient * const _this = reinterpret_cast<ReadClient *>(apAppState);
1016 :
1017 : // TODO: add a more specific error here for liveness timeout failure to distinguish between other classes of timeouts (i.e
1018 : // response timeouts).
1019 6 : CHIP_ERROR subscriptionTerminationCause = CHIP_ERROR_TIMEOUT;
1020 :
1021 : //
1022 : // Might as well try to see if this instance exists in the tracked list in the IM.
1023 : // This might blow-up if either the client has since been free'ed (use-after-free), or if the engine has since
1024 : // been shutdown at which point the client wouldn't exist in the active read client list.
1025 : //
1026 6 : VerifyOrDie(_this->mpImEngine->InActiveReadClientList(_this));
1027 :
1028 6 : ChipLogError(DataManagement,
1029 : "Subscription Liveness timeout with SubscriptionID = 0x%08" PRIx32 ", Peer = %02x:" ChipLogFormatX64,
1030 : _this->mSubscriptionId, _this->GetFabricIndex(), ChipLogValueX64(_this->GetPeerNodeId()));
1031 :
1032 6 : if (_this->mIsPeerLIT)
1033 : {
1034 3 : subscriptionTerminationCause = CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT;
1035 : }
1036 :
1037 6 : _this->TriggerResubscriptionForLivenessTimeout(subscriptionTerminationCause);
1038 6 : }
1039 :
1040 7 : void ReadClient::TriggerResubscriptionForLivenessTimeout(CHIP_ERROR aReason)
1041 : {
1042 : // We didn't get a message from the server on time; it's possible that it no
1043 : // longer has a useful CASE session to us. Mark defunct all sessions that
1044 : // have not seen peer activity in at least as long as our session.
1045 7 : const auto & holder = mReadPrepareParams.mSessionHolder;
1046 7 : if (holder)
1047 : {
1048 7 : System::Clock::Timestamp lastPeerActivity = holder->AsSecureSession()->GetLastPeerActivityTime();
1049 7 : mpImEngine->GetExchangeManager()->GetSessionManager()->ForEachMatchingSession(mPeer, [&lastPeerActivity](auto * session) {
1050 7 : if (!session->IsCASESession())
1051 : {
1052 7 : return;
1053 : }
1054 :
1055 0 : if (session->GetLastPeerActivityTime() > lastPeerActivity)
1056 : {
1057 0 : return;
1058 : }
1059 :
1060 0 : session->MarkAsDefunct();
1061 : });
1062 : }
1063 :
1064 7 : Close(aReason);
1065 7 : }
1066 :
1067 211 : CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aPayload)
1068 : {
1069 211 : System::PacketBufferTLVReader reader;
1070 211 : reader.Init(std::move(aPayload));
1071 :
1072 211 : SubscribeResponseMessage::Parser subscribeResponse;
1073 211 : ReturnErrorOnFailure(subscribeResponse.Init(reader));
1074 :
1075 : #if CHIP_CONFIG_IM_PRETTY_PRINT
1076 211 : subscribeResponse.PrettyPrint();
1077 : #endif
1078 :
1079 211 : SubscriptionId subscriptionId = 0;
1080 211 : VerifyOrReturnError(subscribeResponse.GetSubscriptionId(&subscriptionId) == CHIP_NO_ERROR, CHIP_ERROR_INVALID_ARGUMENT);
1081 211 : VerifyOrReturnError(IsMatchingSubscriptionId(subscriptionId), CHIP_ERROR_INVALID_SUBSCRIPTION);
1082 209 : ReturnErrorOnFailure(subscribeResponse.GetMaxInterval(&mMaxInterval));
1083 :
1084 : #if CHIP_PROGRESS_LOGGING
1085 209 : auto duration = System::Clock::Milliseconds32(System::SystemClock().GetMonotonicTimestamp() - mSubscribeRequestTime);
1086 : #endif
1087 209 : ChipLogProgress(DataManagement,
1088 : "Subscription established in %" PRIu32 "ms with SubscriptionID = 0x%08" PRIx32 " MinInterval = %u"
1089 : "s MaxInterval = %us Peer = %02x:" ChipLogFormatX64,
1090 : duration.count(), mSubscriptionId, mMinIntervalFloorSeconds, mMaxInterval, GetFabricIndex(),
1091 : ChipLogValueX64(GetPeerNodeId()));
1092 :
1093 209 : ReturnErrorOnFailure(subscribeResponse.ExitContainer());
1094 :
1095 209 : MoveToState(ClientState::SubscriptionActive);
1096 :
1097 209 : mpCallback.OnSubscriptionEstablished(subscriptionId);
1098 :
1099 209 : mNumRetries = 0;
1100 :
1101 209 : ReturnErrorOnFailure(RefreshLivenessCheckTimer());
1102 :
1103 209 : return CHIP_NO_ERROR;
1104 211 : }
1105 :
1106 157 : CHIP_ERROR ReadClient::SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams)
1107 : {
1108 : // Make sure we don't use minimal resubscribe delays from previous attempts
1109 : // for this one.
1110 157 : mMinimalResubscribeDelay = System::Clock::kZero;
1111 :
1112 157 : mReadPrepareParams = std::move(aReadPrepareParams);
1113 157 : CHIP_ERROR err = SendSubscribeRequest(mReadPrepareParams);
1114 157 : if (err != CHIP_NO_ERROR)
1115 : {
1116 1 : StopResubscription();
1117 : }
1118 157 : return err;
1119 : }
1120 :
1121 0 : CHIP_ERROR ReadClient::SendAutoResubscribeRequest(const ScopedNodeId & aPublisherId, ReadPrepareParams && aReadPrepareParams)
1122 : {
1123 0 : mPeer = aPublisherId;
1124 0 : mReadPrepareParams = std::move(aReadPrepareParams);
1125 0 : CHIP_ERROR err = EstablishSessionToPeer();
1126 0 : if (err != CHIP_NO_ERROR)
1127 : {
1128 : // Make sure we call our callback's OnDeallocatePaths.
1129 0 : StopResubscription();
1130 : }
1131 0 : return err;
1132 : }
1133 :
1134 242 : CHIP_ERROR ReadClient::SendSubscribeRequest(const ReadPrepareParams & aReadPrepareParams)
1135 : {
1136 242 : VerifyOrReturnError(aReadPrepareParams.mMinIntervalFloorSeconds <= aReadPrepareParams.mMaxIntervalCeilingSeconds,
1137 : CHIP_ERROR_INVALID_ARGUMENT);
1138 :
1139 239 : auto err = SendSubscribeRequestImpl(aReadPrepareParams);
1140 239 : if (CHIP_NO_ERROR != err)
1141 : {
1142 : MATTER_LOG_METRIC_END(Tracing::kMetricDeviceSubscriptionSetup, err);
1143 : }
1144 239 : return err;
1145 : }
1146 :
1147 239 : CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadPrepareParams)
1148 : {
1149 : MATTER_LOG_METRIC_BEGIN(Tracing::kMetricDeviceSubscriptionSetup);
1150 :
1151 : #if CHIP_PROGRESS_LOGGING
1152 239 : mSubscribeRequestTime = System::SystemClock().GetMonotonicTimestamp();
1153 : #endif
1154 :
1155 239 : VerifyOrReturnError(ClientState::Idle == mState, CHIP_ERROR_INCORRECT_STATE);
1156 :
1157 239 : if (&aReadPrepareParams != &mReadPrepareParams)
1158 : {
1159 76 : mReadPrepareParams.mSessionHolder = aReadPrepareParams.mSessionHolder;
1160 : }
1161 :
1162 239 : mIsPeerLIT = aReadPrepareParams.mIsPeerLIT;
1163 :
1164 239 : mMinIntervalFloorSeconds = aReadPrepareParams.mMinIntervalFloorSeconds;
1165 :
1166 : // Todo: Remove the below, Update span in ReadPrepareParams
1167 239 : Span<AttributePathParams> attributePaths(aReadPrepareParams.mpAttributePathParamsList,
1168 239 : aReadPrepareParams.mAttributePathParamsListSize);
1169 239 : Span<EventPathParams> eventPaths(aReadPrepareParams.mpEventPathParamsList, aReadPrepareParams.mEventPathParamsListSize);
1170 239 : Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList,
1171 239 : aReadPrepareParams.mDataVersionFilterListSize);
1172 :
1173 239 : System::PacketBufferHandle msgBuf;
1174 239 : System::PacketBufferTLVWriter writer;
1175 239 : SubscribeRequestMessage::Builder request;
1176 239 : InitWriterWithSpaceReserved(writer, kReservedSizeForTLVEncodingOverhead);
1177 :
1178 239 : ReturnErrorOnFailure(request.Init(&writer));
1179 :
1180 239 : request.KeepSubscriptions(aReadPrepareParams.mKeepSubscriptions)
1181 239 : .MinIntervalFloorSeconds(aReadPrepareParams.mMinIntervalFloorSeconds)
1182 239 : .MaxIntervalCeilingSeconds(aReadPrepareParams.mMaxIntervalCeilingSeconds);
1183 :
1184 239 : if (!attributePaths.empty())
1185 : {
1186 232 : AttributePathIBs::Builder & attributePathListBuilder = request.CreateAttributeRequests();
1187 232 : ReturnErrorOnFailure(attributePathListBuilder.GetError());
1188 232 : ReturnErrorOnFailure(GenerateAttributePaths(attributePathListBuilder, attributePaths));
1189 : }
1190 :
1191 239 : if (!eventPaths.empty())
1192 : {
1193 22 : EventPathIBs::Builder & eventPathListBuilder = request.CreateEventRequests();
1194 22 : ReturnErrorOnFailure(eventPathListBuilder.GetError());
1195 22 : ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths));
1196 :
1197 22 : Optional<EventNumber> eventMin;
1198 22 : ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin));
1199 22 : if (eventMin.HasValue())
1200 : {
1201 0 : EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
1202 0 : ReturnErrorOnFailure(request.GetError());
1203 0 : ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value()));
1204 : }
1205 22 : }
1206 :
1207 239 : ReturnErrorOnFailure(request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError());
1208 :
1209 239 : bool encodedDataVersionList = false;
1210 239 : TLV::TLVWriter backup;
1211 239 : request.Checkpoint(backup);
1212 239 : DataVersionFilterIBs::Builder & dataVersionFilterListBuilder = request.CreateDataVersionFilters();
1213 239 : ReturnErrorOnFailure(request.GetError());
1214 239 : if (!attributePaths.empty())
1215 : {
1216 232 : ReturnErrorOnFailure(GenerateDataVersionFilterList(dataVersionFilterListBuilder, attributePaths, dataVersionFilters,
1217 : encodedDataVersionList));
1218 : }
1219 239 : ReturnErrorOnFailure(dataVersionFilterListBuilder.GetWriter()->UnreserveBuffer(kReservedSizeForTLVEncodingOverhead));
1220 239 : if (encodedDataVersionList)
1221 : {
1222 65 : ReturnErrorOnFailure(dataVersionFilterListBuilder.EndOfDataVersionFilterIBs());
1223 : }
1224 : else
1225 : {
1226 174 : request.Rollback(backup);
1227 : }
1228 :
1229 239 : ReturnErrorOnFailure(request.EndOfSubscribeRequestMessage());
1230 239 : ReturnErrorOnFailure(writer.Finalize(&msgBuf));
1231 :
1232 239 : VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION);
1233 :
1234 239 : auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
1235 239 : if (exchange == nullptr)
1236 : {
1237 0 : if (aReadPrepareParams.mSessionHolder->AsSecureSession()->IsActiveSession())
1238 : {
1239 0 : return CHIP_ERROR_NO_MEMORY;
1240 : }
1241 :
1242 : // Trying to subscribe with a defunct session somehow.
1243 0 : return CHIP_ERROR_INCORRECT_STATE;
1244 : }
1245 :
1246 239 : mExchange.Grab(exchange);
1247 :
1248 239 : if (aReadPrepareParams.mTimeout == System::Clock::kZero)
1249 : {
1250 239 : mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
1251 : }
1252 : else
1253 : {
1254 0 : mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout);
1255 : }
1256 :
1257 239 : ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::SubscribeRequest, std::move(msgBuf),
1258 : Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));
1259 :
1260 239 : mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer();
1261 239 : MoveToState(ClientState::AwaitingInitialReport);
1262 :
1263 239 : return CHIP_NO_ERROR;
1264 239 : }
1265 :
1266 6 : CHIP_ERROR ReadClient::DefaultResubscribePolicy(CHIP_ERROR aTerminationCause)
1267 : {
1268 6 : if (aTerminationCause == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT)
1269 : {
1270 0 : ChipLogProgress(DataManagement, "ICD device is inactive, skipping scheduling resubscribe within DefaultResubscribePolicy");
1271 0 : return CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT;
1272 : }
1273 :
1274 6 : VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
1275 :
1276 6 : auto timeTillNextResubscription = ComputeTimeTillNextSubscription();
1277 6 : ChipLogProgress(DataManagement,
1278 : "Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32
1279 : "ms due to error %" CHIP_ERROR_FORMAT,
1280 : GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()), mNumRetries, timeTillNextResubscription,
1281 : aTerminationCause.Format());
1282 6 : return ScheduleResubscription(timeTillNextResubscription, NullOptional, aTerminationCause == CHIP_ERROR_TIMEOUT);
1283 : }
1284 :
1285 0 : void ReadClient::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
1286 : const SessionHandle & sessionHandle)
1287 : {
1288 0 : ReadClient * const _this = static_cast<ReadClient *>(context);
1289 0 : VerifyOrDie(_this != nullptr);
1290 :
1291 0 : ChipLogProgress(DataManagement, "HandleDeviceConnected");
1292 0 : _this->mReadPrepareParams.mSessionHolder.Grab(sessionHandle);
1293 0 : _this->mpExchangeMgr = &exchangeMgr;
1294 :
1295 0 : _this->mpCallback.OnCASESessionEstablished(sessionHandle, _this->mReadPrepareParams);
1296 :
1297 0 : auto err = _this->SendSubscribeRequest(_this->mReadPrepareParams);
1298 0 : if (err != CHIP_NO_ERROR)
1299 : {
1300 0 : _this->Close(err);
1301 : }
1302 0 : }
1303 :
1304 0 : void ReadClient::HandleDeviceConnectionFailure(void * context, const OperationalSessionSetup::ConnectionFailureInfo & failureInfo)
1305 : {
1306 0 : ReadClient * const _this = static_cast<ReadClient *>(context);
1307 0 : VerifyOrDie(_this != nullptr);
1308 :
1309 0 : ChipLogError(DataManagement, "Failed to establish CASE for re-subscription with error '%" CHIP_ERROR_FORMAT "'",
1310 : failureInfo.error.Format());
1311 :
1312 : #if CHIP_CONFIG_ENABLE_BUSY_HANDLING_FOR_OPERATIONAL_SESSION_SETUP
1313 : #if CHIP_DETAIL_LOGGING
1314 0 : if (failureInfo.requestedBusyDelay.HasValue())
1315 : {
1316 0 : ChipLogDetail(DataManagement, "Will delay resubscription by %u ms due to BUSY response",
1317 : failureInfo.requestedBusyDelay.Value().count());
1318 : }
1319 : #endif // CHIP_DETAIL_LOGGING
1320 0 : _this->mMinimalResubscribeDelay = failureInfo.requestedBusyDelay.ValueOr(System::Clock::kZero);
1321 : #else
1322 : _this->mMinimalResubscribeDelay = System::Clock::kZero;
1323 : #endif // CHIP_CONFIG_ENABLE_BUSY_HANDLING_FOR_OPERATIONAL_SESSION_SETUP
1324 :
1325 0 : _this->Close(failureInfo.error);
1326 0 : }
1327 :
1328 7 : void ReadClient::OnResubscribeTimerCallback(System::Layer * /* If this starts being used, fix callers that pass nullptr */,
1329 : void * apAppState)
1330 : {
1331 7 : ReadClient * const _this = static_cast<ReadClient *>(apAppState);
1332 7 : VerifyOrDie(_this != nullptr);
1333 :
1334 7 : _this->mIsResubscriptionScheduled = false;
1335 :
1336 : CHIP_ERROR err;
1337 :
1338 7 : ChipLogProgress(DataManagement, "OnResubscribeTimerCallback: ForceCASE = %d", _this->mForceCaseOnNextResub);
1339 7 : _this->mNumRetries++;
1340 :
1341 7 : bool allowResubscribeOnError = true;
1342 14 : if (!_this->mReadPrepareParams.mSessionHolder ||
1343 7 : !_this->mReadPrepareParams.mSessionHolder->AsSecureSession()->IsActiveSession())
1344 : {
1345 : // We don't have an active CASE session. We need to go ahead and set
1346 : // one up, if we can.
1347 0 : if (_this->EstablishSessionToPeer() == CHIP_NO_ERROR)
1348 : {
1349 0 : return;
1350 : }
1351 :
1352 0 : if (_this->mForceCaseOnNextResub)
1353 : {
1354 : // Caller asked us to force CASE but we have no way to do CASE.
1355 : // Just stop trying.
1356 0 : allowResubscribeOnError = false;
1357 : }
1358 :
1359 : // No way to send our subscribe request.
1360 0 : err = CHIP_ERROR_INCORRECT_STATE;
1361 0 : ExitNow();
1362 : }
1363 :
1364 7 : err = _this->SendSubscribeRequest(_this->mReadPrepareParams);
1365 :
1366 7 : exit:
1367 7 : if (err != CHIP_NO_ERROR)
1368 : {
1369 : //
1370 : // Call Close (which should trigger re-subscription again) EXCEPT if we got here because we didn't have a valid
1371 : // CASESessionManager pointer when mForceCaseOnNextResub was true.
1372 : //
1373 : // In that case, don't permit re-subscription to occur.
1374 : //
1375 0 : _this->Close(err, allowResubscribeOnError);
1376 : }
1377 : }
1378 :
1379 65 : void ReadClient::UpdateDataVersionFilters(const ConcreteDataAttributePath & aPath)
1380 : {
1381 130 : for (size_t index = 0; index < mReadPrepareParams.mDataVersionFilterListSize; index++)
1382 : {
1383 65 : if (mReadPrepareParams.mpDataVersionFilterList[index].mEndpointId == aPath.mEndpointId &&
1384 65 : mReadPrepareParams.mpDataVersionFilterList[index].mClusterId == aPath.mClusterId)
1385 : {
1386 : // Now we know the current version for this cluster is aPath.mDataVersion.
1387 65 : mReadPrepareParams.mpDataVersionFilterList[index].mDataVersion = aPath.mDataVersion;
1388 : }
1389 : }
1390 65 : }
1391 :
1392 339 : CHIP_ERROR ReadClient::GetMinEventNumber(const ReadPrepareParams & aReadPrepareParams, Optional<EventNumber> & aEventMin)
1393 : {
1394 339 : if (aReadPrepareParams.mEventNumber.HasValue())
1395 : {
1396 312 : aEventMin = aReadPrepareParams.mEventNumber;
1397 : }
1398 : else
1399 : {
1400 27 : ReturnErrorOnFailure(mpCallback.GetHighestReceivedEventNumber(aEventMin));
1401 27 : if (aEventMin.HasValue())
1402 : {
1403 : // We want to start with the first event _after_ the last one we received.
1404 2 : aEventMin.SetValue(aEventMin.Value() + 1);
1405 : }
1406 : }
1407 339 : return CHIP_NO_ERROR;
1408 : }
1409 :
1410 195 : void ReadClient::TriggerResubscribeIfScheduled(const char * reason)
1411 : {
1412 195 : if (!mIsResubscriptionScheduled)
1413 : {
1414 195 : return;
1415 : }
1416 :
1417 0 : ChipLogDetail(DataManagement, "ReadClient[%p] triggering resubscribe, reason: %s", this, reason);
1418 0 : CancelResubscribeTimer();
1419 0 : OnResubscribeTimerCallback(nullptr, this);
1420 : }
1421 :
1422 0 : Optional<System::Clock::Timeout> ReadClient::GetSubscriptionTimeout()
1423 : {
1424 0 : if (!IsSubscriptionType() || !IsSubscriptionActive())
1425 : {
1426 0 : return NullOptional;
1427 : }
1428 :
1429 : System::Clock::Timeout timeout;
1430 0 : CHIP_ERROR err = ComputeLivenessCheckTimerTimeout(&timeout);
1431 0 : if (err != CHIP_NO_ERROR)
1432 : {
1433 0 : return NullOptional;
1434 : }
1435 :
1436 0 : return MakeOptional(timeout);
1437 : }
1438 :
1439 0 : CHIP_ERROR ReadClient::EstablishSessionToPeer()
1440 : {
1441 0 : ChipLogProgress(DataManagement, "Trying to establish a CASE session for subscription");
1442 0 : auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager();
1443 0 : VerifyOrReturnError(caseSessionManager != nullptr, CHIP_ERROR_INCORRECT_STATE);
1444 0 : caseSessionManager->FindOrEstablishSession(mPeer, &mOnConnectedCallback, &mOnConnectionFailureCallback);
1445 0 : return CHIP_NO_ERROR;
1446 : }
1447 :
1448 : } // namespace app
1449 : } // namespace chip
|