1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.DeliveryImpl; 13 14 15 import hunt.proton.amqp.Binary; 16 import hunt.proton.amqp.transport.DeliveryState; 17 import hunt.proton.codec.CompositeReadableBuffer; 18 import hunt.proton.codec.ReadableBuffer; 19 import hunt.proton.codec.WritableBuffer; 20 import hunt.proton.engine.Delivery; 21 import hunt.proton.engine.Record; 22 import hunt.proton.engine.Transport; 23 import hunt.proton.engine.impl.LinkImpl; 24 import hunt.proton.engine.impl.TransportDelivery; 25 import hunt.proton.engine.impl.SenderImpl; 26 import hunt.proton.engine.impl.ReceiverImpl; 27 import std.algorithm; 28 import hunt.proton.engine.impl.RecordImpl; 29 import std.concurrency : initOnce; 30 31 32 33 class DeliveryImpl : Delivery 34 { 35 public static int DEFAULT_MESSAGE_FORMAT = 0; 36 37 // private static ReadableBuffer EMPTY_BUFFER = ByteBufferReader.allocate(0); 38 39 static ReadableBuffer EMPTY_BUFFER() 40 { 41 __gshared ReadableBuffer inst; 42 return initOnce!inst(ByteBufferReader.allocate(0)); 43 } 44 45 private DeliveryImpl _linkPrevious; 46 private DeliveryImpl _linkNext; 47 48 private DeliveryImpl _workNext; 49 private DeliveryImpl _workPrev; 50 bool _work; 51 52 private DeliveryImpl _transportWorkNext; 53 private DeliveryImpl _transportWorkPrev; 54 bool _transportWork; 55 56 private Record _attachments; 57 private Object _context; 58 59 private byte[] _tag; 60 private LinkImpl _link; 61 private DeliveryState _deliveryState; 62 private bool _settled; 63 private bool _remoteSettled; 64 private DeliveryState _remoteDeliveryState; 65 private DeliveryState _defaultDeliveryState = null; 66 private int _messageFormat ; //= DEFAULT_MESSAGE_FORMAT; 67 68 /** 69 * A bit-mask representing the outstanding work on this delivery received from the transport layer 70 * that has not yet been processed by the application. 71 */ 72 private int _flags = cast(byte) 0; 73 74 private TransportDelivery _transportDelivery; 75 private bool _complete; 76 private bool _updated; 77 private bool _done; 78 private bool _aborted; 79 80 private CompositeReadableBuffer _dataBuffer; 81 private ReadableBuffer _dataView; 82 83 this(byte[] tag, LinkImpl link, DeliveryImpl previous) 84 { 85 _tag = tag; 86 _link = link; 87 _link.incrementUnsettled(); 88 _linkPrevious = previous; 89 _messageFormat = DEFAULT_MESSAGE_FORMAT; 90 if (previous !is null) 91 { 92 previous._linkNext = this; 93 } 94 } 95 96 97 public byte[] getTag() 98 { 99 return _tag; 100 } 101 102 103 public LinkImpl getLink() 104 { 105 return _link; 106 } 107 108 109 public DeliveryState getLocalState() 110 { 111 return _deliveryState; 112 } 113 114 115 public DeliveryState getRemoteState() 116 { 117 return _remoteDeliveryState; 118 } 119 120 121 public bool remotelySettled() 122 { 123 return _remoteSettled; 124 } 125 126 127 public void setMessageFormat(int messageFormat) 128 { 129 _messageFormat = messageFormat; 130 } 131 132 133 public int getMessageFormat() 134 { 135 return _messageFormat; 136 } 137 138 139 public void disposition(DeliveryState state) 140 { 141 _deliveryState = state; 142 if(!_remoteSettled && !_settled) 143 { 144 addToTransportWorkList(); 145 } 146 } 147 148 149 public void settle() 150 { 151 if (_settled) { 152 return; 153 } 154 155 _settled = true; 156 _link.decrementUnsettled(); 157 if(!_remoteSettled) 158 { 159 addToTransportWorkList(); 160 } 161 else 162 { 163 _transportDelivery.settled(); 164 } 165 166 if(_link.current() is this) 167 { 168 _link.advance(); 169 } 170 171 _link.remove(this); 172 if(_linkPrevious !is null) 173 { 174 _linkPrevious._linkNext = _linkNext; 175 } 176 177 if(_linkNext !is null) 178 { 179 _linkNext._linkPrevious = _linkPrevious; 180 } 181 182 updateWork(); 183 184 _linkNext= null; 185 _linkPrevious = null; 186 } 187 188 DeliveryImpl getLinkNext() 189 { 190 return _linkNext; 191 } 192 193 194 public DeliveryImpl next() 195 { 196 return getLinkNext(); 197 } 198 199 200 public void free() 201 { 202 settle(); 203 } 204 205 DeliveryImpl getLinkPrevious() 206 { 207 return _linkPrevious; 208 } 209 210 211 public DeliveryImpl getWorkNext() 212 { 213 if (_workNext !is null) 214 return _workNext; 215 // the following hack is brought to you by the C implementation! 216 if (!_work) // not on the work list 217 return (_link.getConnectionImpl()).getWorkHead(); 218 return null; 219 } 220 221 DeliveryImpl getWorkPrev() 222 { 223 return _workPrev; 224 } 225 226 void setWorkNext(DeliveryImpl workNext) 227 { 228 _workNext = workNext; 229 } 230 231 void setWorkPrev(DeliveryImpl workPrev) 232 { 233 _workPrev = workPrev; 234 } 235 236 int recv(byte[] bytes, int offset, int size) 237 { 238 int consumed; 239 if (_dataBuffer !is null && _dataBuffer.hasRemaining()) 240 { 241 consumed = min(size, _dataBuffer.remaining()); 242 243 _dataBuffer.get(bytes, offset, consumed); 244 _dataBuffer.reclaimRead(); 245 } 246 else 247 { 248 consumed = 0; 249 } 250 251 return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed; //TODO - Implement 252 } 253 254 int recv(WritableBuffer buffer) 255 { 256 int consumed; 257 if (_dataBuffer !is null && _dataBuffer.hasRemaining()) 258 { 259 consumed = min(buffer.remaining(), _dataBuffer.remaining()); 260 buffer.put(_dataBuffer); 261 _dataBuffer.reclaimRead(); 262 } 263 else 264 { 265 consumed = 0; 266 } 267 268 return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed; 269 } 270 271 ReadableBuffer recv() 272 { 273 ReadableBuffer result = _dataView; 274 if (_dataView !is null) 275 { 276 _dataView = _dataBuffer = null; 277 } 278 else 279 { 280 result = EMPTY_BUFFER; 281 } 282 283 return result; 284 } 285 286 void updateWork() 287 { 288 getLink().getConnectionImpl().workUpdate(this); 289 } 290 291 DeliveryImpl clearTransportWork() 292 { 293 DeliveryImpl next = _transportWorkNext; 294 getLink().getConnectionImpl().removeTransportWork(this); 295 return next; 296 } 297 298 void addToTransportWorkList() 299 { 300 getLink().getConnectionImpl().addTransportWork(this); 301 } 302 303 DeliveryImpl getTransportWorkNext() 304 { 305 return _transportWorkNext; 306 } 307 308 DeliveryImpl getTransportWorkPrev() 309 { 310 return _transportWorkPrev; 311 } 312 313 void setTransportWorkNext(DeliveryImpl transportWorkNext) 314 { 315 _transportWorkNext = transportWorkNext; 316 } 317 318 void setTransportWorkPrev(DeliveryImpl transportWorkPrev) 319 { 320 _transportWorkPrev = transportWorkPrev; 321 } 322 323 TransportDelivery getTransportDelivery() 324 { 325 return _transportDelivery; 326 } 327 328 void setTransportDelivery(TransportDelivery transportDelivery) 329 { 330 _transportDelivery = transportDelivery; 331 } 332 333 334 public bool isSettled() 335 { 336 return _settled; 337 } 338 339 int send(byte[] bytes, int offset, int length) 340 { 341 byte[] copy = new byte[length]; 342 //System.arraycopy(bytes, offset, copy, 0, length); 343 copy[0 .. length] = bytes[offset .. offset+length]; 344 getOrCreateDataBuffer().append(copy); 345 addToTransportWorkList(); 346 return length; 347 } 348 349 int send(ReadableBuffer buffer) 350 { 351 int length = buffer.remaining(); 352 getOrCreateDataBuffer().append(copyContents(buffer)); 353 addToTransportWorkList(); 354 return length; 355 } 356 357 int sendNoCopy(ReadableBuffer buffer) 358 { 359 int length = buffer.remaining(); 360 361 if (_dataView is null || !_dataView.hasRemaining()) 362 { 363 _dataView = buffer; 364 } 365 else 366 { 367 consolidateSendBuffers(buffer); 368 } 369 370 addToTransportWorkList(); 371 return length; 372 } 373 374 private byte[] copyContents(ReadableBuffer buffer) 375 { 376 byte[] copy = new byte[buffer.remaining()]; 377 378 if (buffer.hasArray()) 379 { 380 // System.arraycopy(buffer.array(), buffer.arrayOffset() + buffer.position(), copy, 0, buffer.remaining()); 381 copy[0 .. buffer.remaining()] = buffer.array()[buffer.arrayOffset() + buffer.position() .. buffer.arrayOffset() + buffer.position()+buffer.remaining()]; 382 buffer.position(buffer.limit()); 383 } 384 else 385 { 386 buffer.get(copy, 0, buffer.remaining()); 387 } 388 389 return copy; 390 } 391 392 private void consolidateSendBuffers(ReadableBuffer buffer) 393 { 394 if (_dataView == _dataBuffer) 395 { 396 getOrCreateDataBuffer().append(copyContents(buffer)); 397 } 398 else 399 { 400 ReadableBuffer oldView = _dataView; 401 402 CompositeReadableBuffer dataBuffer = getOrCreateDataBuffer(); 403 dataBuffer.append(copyContents(oldView)); 404 dataBuffer.append(copyContents(buffer)); 405 406 oldView.reclaimRead(); 407 } 408 409 buffer.reclaimRead(); // A pooled buffer could release now. 410 } 411 412 void append(Binary payload) 413 { 414 byte[] data = payload.getArray(); 415 416 // The Composite buffer cannot handle composites where the array 417 // is a view of a larger array so we must copy the payload into 418 // an array of the exact size 419 if (payload.getArrayOffset() > 0 || payload.getLength() < data.length) 420 { 421 data = new byte[payload.getLength()]; 422 // System.arraycopy(payload.getArray(), payload.getArrayOffset(), data, 0, payload.getLength()); 423 data[0 .. payload.getLength()] = payload.getArray()[payload.getArrayOffset() .. payload.getArrayOffset()+payload.getLength()]; 424 } 425 426 getOrCreateDataBuffer().append(data); 427 } 428 429 private CompositeReadableBuffer getOrCreateDataBuffer() 430 { 431 if (_dataBuffer is null) 432 { 433 _dataView = _dataBuffer = new CompositeReadableBuffer(); 434 } 435 436 return _dataBuffer; 437 } 438 439 void append(byte[] data) 440 { 441 getOrCreateDataBuffer().append(data); 442 } 443 444 void afterSend() 445 { 446 if (_dataView !is null) 447 { 448 _dataView.reclaimRead(); 449 if (!_dataView.hasRemaining()) 450 { 451 _dataView = _dataBuffer; 452 } 453 } 454 } 455 456 ReadableBuffer getData() 457 { 458 return _dataView is null ? EMPTY_BUFFER : _dataView; 459 } 460 461 int getDataLength() 462 { 463 return _dataView is null ? 0 : _dataView.remaining(); 464 } 465 466 467 public int available() 468 { 469 return _dataView is null ? 0 : _dataView.remaining(); 470 } 471 472 473 public bool isWritable() 474 { 475 return cast(SenderImpl)getLink() !is null 476 && getLink().current() is this 477 && (cast(SenderImpl) getLink()).hasCredit(); 478 } 479 480 481 public bool isReadable() 482 { 483 return cast(ReceiverImpl)getLink() !is null 484 && getLink().current() is this; 485 } 486 487 void setComplete() 488 { 489 _complete = true; 490 } 491 492 void setAborted() 493 { 494 _aborted = true; 495 } 496 497 498 public bool isAborted() 499 { 500 return _aborted; 501 } 502 503 504 public bool isPartial() 505 { 506 return !_complete; 507 } 508 509 void setRemoteDeliveryState(DeliveryState remoteDeliveryState) 510 { 511 _remoteDeliveryState = remoteDeliveryState; 512 _updated = true; 513 } 514 515 516 public bool isUpdated() 517 { 518 return _updated; 519 } 520 521 522 public void clear() 523 { 524 _updated = false; 525 getLink().getConnectionImpl().workUpdate(this); 526 } 527 528 void setDone() 529 { 530 _done = true; 531 } 532 533 bool isDone() 534 { 535 return _done; 536 } 537 538 void setRemoteSettled(bool remoteSettled) 539 { 540 _remoteSettled = remoteSettled; 541 _updated = true; 542 } 543 544 545 public bool isBuffered() 546 { 547 if (_remoteSettled) return false; 548 if ( cast(SenderImpl)getLink() !is null) { 549 if (isDone()) { 550 return false; 551 } else { 552 bool hasRemaining = false; 553 if (_dataView !is null) { 554 hasRemaining = _dataView.hasRemaining(); 555 } 556 557 return _complete || hasRemaining; 558 } 559 } else { 560 return false; 561 } 562 } 563 564 565 public Object getContext() 566 { 567 return _context; 568 } 569 570 571 public void setContext(Object context) 572 { 573 _context = context; 574 } 575 576 577 public Record attachments() 578 { 579 if(_attachments is null) 580 { 581 _attachments = new RecordImpl(); 582 } 583 584 return _attachments; 585 } 586 587 // 588 //public String toString() 589 //{ 590 // StringBuilder builder = new StringBuilder(); 591 // builder.append("DeliveryImpl [_tag=").append(Arrays.toString(_tag)) 592 // .append(", _link=").append(_link) 593 // .append(", _deliveryState=").append(_deliveryState) 594 // .append(", _settled=").append(_settled) 595 // .append(", _remoteSettled=").append(_remoteSettled) 596 // .append(", _remoteDeliveryState=").append(_remoteDeliveryState) 597 // .append(", _flags=").append(_flags) 598 // .append(", _defaultDeliveryState=").append(_defaultDeliveryState) 599 // .append(", _transportDelivery=").append(_transportDelivery) 600 // .append(", _data Size=").append(getDataLength()) 601 // .append(", _complete=").append(_complete) 602 // .append(", _updated=").append(_updated) 603 // .append(", _done=").append(_done) 604 // .append("]"); 605 // return builder.toString(); 606 //} 607 608 609 public int pending() 610 { 611 return _dataView is null ? 0 : _dataView.remaining(); 612 } 613 614 615 public void setDefaultDeliveryState(DeliveryState state) 616 { 617 _defaultDeliveryState = state; 618 } 619 620 621 public DeliveryState getDefaultDeliveryState() 622 { 623 return _defaultDeliveryState; 624 } 625 }