1 module hunt.proton.message.impl.MessageImpl; 2 3 import std.stdio; 4 5 import hunt.io.ByteBuffer; 6 import hunt.time.LocalDateTime; 7 import hunt.proton.amqp.Binary; 8 import hunt.proton.amqp.Symbol; 9 import hunt.proton.amqp.UnsignedByte; 10 import hunt.proton.amqp.UnsignedInteger; 11 import hunt.proton.amqp.messaging.Header; 12 import hunt.proton.amqp.messaging.DeliveryAnnotations; 13 import hunt.proton.amqp.messaging.MessageAnnotations; 14 import hunt.proton.amqp.messaging.Properties; 15 import hunt.proton.amqp.messaging.ApplicationProperties; 16 import hunt.proton.amqp.messaging.Section; 17 import hunt.proton.amqp.messaging.Footer; 18 import hunt.proton.codec.DecoderImpl; 19 import hunt.proton.codec.EncoderImpl; 20 import hunt.proton.message.Message; 21 import hunt.proton.message.ProtonJMessage; 22 import hunt.proton.message.MessageError; 23 import hunt.proton.codec.AMQPDefinedTypes; 24 import hunt.io.BufferUtils; 25 import hunt.proton.codec.ReadableBuffer; 26 import hunt.proton.codec.DroppingWritableBuffer; 27 import hunt.proton.codec.WritableBuffer; 28 import hunt.proton.codec.CompositeWritableBuffer; 29 30 import hunt.Boolean; 31 import hunt.String; 32 import hunt.logging; 33 34 35 36 class EncoderDecoderPair { 37 DecoderImpl decoder; 38 EncoderImpl encoder; 39 this() 40 { 41 decoder = new DecoderImpl(); 42 encoder = new EncoderImpl(decoder); 43 AMQPDefinedTypes.registerAllTypes(decoder, encoder); 44 } 45 } 46 47 EncoderDecoderPair tlsCodec; 48 49 50 static this() 51 { 52 tlsCodec = new EncoderDecoderPair(); 53 } 54 55 56 class MessageImpl : ProtonJMessage 57 { 58 private Header _header; 59 private DeliveryAnnotations _deliveryAnnotations; 60 private MessageAnnotations _messageAnnotations; 61 private Properties _properties; 62 private ApplicationProperties _applicationProperties; 63 private Section _body; 64 private Footer _footer; 65 66 67 68 //private static ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() { 69 // protected EncoderDecoderPair initialValue() { 70 // return new EncoderDecoderPair(); 71 // } 72 // }; 73 74 /** 75 * Application code should use {@link hunt.proton.message.Message.Factory#create()} instead. 76 */ 77 this() 78 { 79 } 80 81 /** 82 * Application code should instead use 83 * {@link hunt.proton.message.Message.Factory#create(Header, DeliveryAnnotations, MessageAnnotations, Properties, ApplicationProperties, Section, Footer)} 84 */ 85 this(Header header, DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations, 86 Properties properties, ApplicationProperties applicationProperties, Section bd, Footer footer) 87 { 88 _header = header; 89 _deliveryAnnotations = deliveryAnnotations; 90 _messageAnnotations = messageAnnotations; 91 _properties = properties; 92 _applicationProperties = applicationProperties; 93 _body = bd; 94 _footer = footer; 95 } 96 97 public bool isDurable() 98 { 99 return (_header is null || _header.getDurable() is null) ? false : _header.getDurable().booleanValue; 100 } 101 102 103 public long getDeliveryCount() 104 { 105 return (_header is null || _header.getDeliveryCount() is null) ? 0 : _header.getDeliveryCount().longValue(); 106 } 107 108 109 public short getPriority() 110 { 111 return (_header is null || _header.getPriority() is null) 112 ? DEFAULT_PRIORITY 113 : _header.getPriority().shortValue(); 114 } 115 116 public bool isFirstAcquirer() 117 { 118 return (_header is null || _header.getFirstAcquirer() is null) ? false : _header.getFirstAcquirer().booleanValue; 119 } 120 121 public long getTtl() 122 { 123 return (_header is null || _header.getTtl() is null) ? 0 : _header.getTtl().longValue(); 124 } 125 126 public void setDurable(bool durable) 127 { 128 if (_header is null) 129 { 130 if (durable) 131 { 132 _header = new Header(); 133 } 134 else 135 { 136 return; 137 } 138 } 139 _header.setDurable(new Boolean(durable)); 140 } 141 142 public void setTtl(long ttl) 143 { 144 145 if (_header is null) 146 { 147 if (ttl != 0) 148 { 149 _header = new Header(); 150 } 151 else 152 { 153 return; 154 } 155 } 156 _header.setTtl(UnsignedInteger.valueOf(ttl)); 157 } 158 159 public void setDeliveryCount(long deliveryCount) 160 { 161 if (_header is null) 162 { 163 if (deliveryCount == 0) 164 { 165 return; 166 } 167 _header = new Header(); 168 } 169 _header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount)); 170 } 171 172 173 public void setFirstAcquirer(bool firstAcquirer) 174 { 175 176 if (_header is null) 177 { 178 if (!firstAcquirer) 179 { 180 return; 181 } 182 _header = new Header(); 183 } 184 _header.setFirstAcquirer(new Boolean(firstAcquirer)); 185 } 186 187 public void setPriority(short priority) 188 { 189 190 if (_header is null) 191 { 192 if (priority == DEFAULT_PRIORITY) 193 { 194 return; 195 } 196 _header = new Header(); 197 } 198 _header.setPriority(UnsignedByte.valueOf(cast(byte) priority)); 199 } 200 201 public Object getMessageId() 202 { 203 return _properties is null ? null : _properties.getMessageId(); 204 } 205 206 public long getGroupSequence() 207 { 208 return (_properties is null || _properties.getGroupSequence() is null) ? 0 : _properties.getGroupSequence().intValue(); 209 } 210 211 public String getReplyToGroupId() 212 { 213 return _properties is null ? null : _properties.getReplyToGroupId(); 214 } 215 216 public long getCreationTime() 217 { 218 return (_properties is null || _properties.getCreationTime() is null) ? 0 : _properties.getCreationTime().toEpochMilli(); 219 } 220 221 public String getAddress() 222 { 223 return _properties is null ? null : _properties.getTo(); 224 } 225 226 public byte[] getUserId() 227 { 228 if(_properties is null || _properties.getUserId() is null) 229 { 230 return null; 231 } 232 else 233 { 234 Binary userId = _properties.getUserId(); 235 byte[] id = new byte[userId.getLength()]; 236 // System.arraycopy(userId.getArray(),userId.getArrayOffset(),id,0,userId.getLength()); 237 id[0 .. userId.getLength()] = userId.getArray()[userId.getArrayOffset() .. userId.getArrayOffset() + userId.getLength()]; 238 return id; 239 } 240 241 } 242 243 public String getReplyTo() 244 { 245 return _properties is null ? null : _properties.getReplyTo(); 246 } 247 248 public String getGroupId() 249 { 250 return _properties is null ? null : _properties.getGroupId(); 251 } 252 253 public String getContentType() 254 { 255 return (_properties is null || _properties.getContentType() is null) ? null : new String(_properties.getContentType().toString()); 256 } 257 258 public long getExpiryTime() 259 { 260 return (_properties is null || _properties.getAbsoluteExpiryTime() is null) ? 0 : _properties.getAbsoluteExpiryTime().toEpochMilli(); 261 } 262 263 public Object getCorrelationId() 264 { 265 return (_properties is null) ? null : _properties.getCorrelationId(); 266 } 267 268 public String getContentEncoding() 269 { 270 return (_properties is null || _properties.getContentEncoding() is null) ? null : new String (_properties.getContentEncoding().toString()); 271 } 272 273 public String getSubject() 274 { 275 return _properties is null ? null : _properties.getSubject(); 276 } 277 278 public void setGroupSequence(long groupSequence) 279 { 280 if(_properties is null) 281 { 282 if(groupSequence == 0) 283 { 284 return; 285 } 286 else 287 { 288 _properties = new Properties(); 289 } 290 } 291 _properties.setGroupSequence(UnsignedInteger.valueOf(cast(int) groupSequence)); 292 } 293 294 public void setUserId(byte[] userId) 295 { 296 if(userId is null) 297 { 298 if(_properties !is null) 299 { 300 _properties.setUserId(null); 301 } 302 303 } 304 else 305 { 306 if(_properties is null) 307 { 308 _properties = new Properties(); 309 } 310 byte[] id = new byte[userId.length]; 311 // System.arraycopy(userId, 0, id,0, userId.length); 312 id[0 .. userId.length] = userId[0 .. userId.length]; 313 _properties.setUserId(new Binary(id)); 314 } 315 } 316 317 318 public void setCreationTime(long creationTime) 319 { 320 if(_properties is null) 321 { 322 if(creationTime == 0) 323 { 324 return; 325 } 326 _properties = new Properties(); 327 328 } 329 _properties.setCreationTime(Date.ofEpochMilli(creationTime)); 330 } 331 332 333 public void setSubject(String subject) 334 { 335 if(_properties is null) 336 { 337 if(subject is null) 338 { 339 return; 340 } 341 _properties = new Properties(); 342 } 343 _properties.setSubject(subject); 344 } 345 346 347 public void setGroupId(String groupId) 348 { 349 if(_properties is null) 350 { 351 if(groupId is null) 352 { 353 return; 354 } 355 _properties = new Properties(); 356 } 357 _properties.setGroupId(groupId); 358 } 359 360 361 public void setAddress(String to) 362 { 363 if(_properties is null) 364 { 365 if(to is null) 366 { 367 return; 368 } 369 _properties = new Properties(); 370 } 371 _properties.setTo(to); 372 } 373 374 375 public void setExpiryTime(long absoluteExpiryTime) 376 { 377 if(_properties is null) 378 { 379 if(absoluteExpiryTime == 0) 380 { 381 return; 382 } 383 _properties = new Properties(); 384 385 } 386 _properties.setAbsoluteExpiryTime(Date.ofEpochMilli(absoluteExpiryTime)); 387 } 388 389 390 public void setReplyToGroupId(String replyToGroupId) 391 { 392 if(_properties is null) 393 { 394 if(replyToGroupId is null) 395 { 396 return; 397 } 398 _properties = new Properties(); 399 } 400 _properties.setReplyToGroupId(replyToGroupId); 401 } 402 403 404 public void setContentEncoding(String contentEncoding) 405 { 406 if(_properties is null) 407 { 408 if(contentEncoding is null) 409 { 410 return; 411 } 412 _properties = new Properties(); 413 } 414 _properties.setContentEncoding(Symbol.valueOf(cast(string)(contentEncoding.getBytes()))); 415 } 416 417 418 public void setContentType(String contentType) 419 { 420 if(_properties is null) 421 { 422 if(contentType is null) 423 { 424 return; 425 } 426 _properties = new Properties(); 427 } 428 _properties.setContentType(Symbol.valueOf(cast(string)(contentType.getBytes()))); 429 } 430 431 432 public void setReplyTo(String replyTo) 433 { 434 435 if(_properties is null) 436 { 437 if(replyTo is null) 438 { 439 return; 440 } 441 _properties = new Properties(); 442 } 443 _properties.setReplyTo(replyTo); 444 } 445 446 447 public void setCorrelationId(String correlationId) 448 { 449 450 if(_properties is null) 451 { 452 if(correlationId is null) 453 { 454 return; 455 } 456 _properties = new Properties(); 457 } 458 _properties.setCorrelationId(correlationId); 459 } 460 461 462 public void setMessageId(String messageId) 463 { 464 465 if(_properties is null) 466 { 467 if(messageId is null) 468 { 469 return; 470 } 471 _properties = new Properties(); 472 } 473 _properties.setMessageId(messageId); 474 } 475 476 477 478 public Header getHeader() 479 { 480 return _header; 481 } 482 483 484 public DeliveryAnnotations getDeliveryAnnotations() 485 { 486 return _deliveryAnnotations; 487 } 488 489 490 public MessageAnnotations getMessageAnnotations() 491 { 492 return _messageAnnotations; 493 } 494 495 496 public Properties getProperties() 497 { 498 return _properties; 499 } 500 501 502 public ApplicationProperties getApplicationProperties() 503 { 504 return _applicationProperties; 505 } 506 507 508 public Section getBody() 509 { 510 return _body; 511 } 512 513 514 public Footer getFooter() 515 { 516 return _footer; 517 } 518 519 520 public void setHeader(Header header) 521 { 522 _header = header; 523 } 524 525 526 public void setDeliveryAnnotations(DeliveryAnnotations deliveryAnnotations) 527 { 528 _deliveryAnnotations = deliveryAnnotations; 529 } 530 531 532 public void setMessageAnnotations(MessageAnnotations messageAnnotations) 533 { 534 _messageAnnotations = messageAnnotations; 535 } 536 537 538 public void setProperties(Properties properties) 539 { 540 _properties = properties; 541 } 542 543 544 public void setApplicationProperties(ApplicationProperties applicationProperties) 545 { 546 _applicationProperties = applicationProperties; 547 } 548 549 550 public void setBody(Section body) 551 { 552 _body = body; 553 } 554 555 556 public void setFooter(Footer footer) 557 { 558 _footer = footer; 559 } 560 561 562 public int decode(byte[] data, int offset, int length) 563 { 564 ByteBuffer buffer = BufferUtils.toBuffer(data, offset, length); 565 decode(buffer); 566 567 return length-buffer.remaining(); 568 } 569 570 public void decode(ByteBuffer buffer) 571 { 572 decode(ByteBufferReader.wrap(buffer)); 573 } 574 575 public void decode(ReadableBuffer buffer) 576 { 577 DecoderImpl decoder = tlsCodec.decoder; 578 decoder.setBuffer(buffer); 579 580 _header = null; 581 _deliveryAnnotations = null; 582 _messageAnnotations = null; 583 _properties = null; 584 _applicationProperties = null; 585 _body = null; 586 _footer = null; 587 Section section = null; 588 589 if(buffer.hasRemaining()) 590 { 591 section = cast(Section) decoder.readObject(); 592 } 593 594 595 _header = cast(Header)section; 596 if(_header !is null) 597 { 598 //_header = (Header) section; 599 if(buffer.hasRemaining()) 600 { 601 section = cast(Section) decoder.readObject(); 602 } 603 else 604 { 605 section = null; 606 } 607 608 } 609 610 _deliveryAnnotations = cast(DeliveryAnnotations)section; 611 if(_deliveryAnnotations !is null) 612 { 613 _deliveryAnnotations = cast(DeliveryAnnotations) section; 614 615 if(buffer.hasRemaining()) 616 { 617 section = cast(Section) decoder.readObject(); 618 } 619 else 620 { 621 section = null; 622 } 623 624 } 625 626 _messageAnnotations = cast(MessageAnnotations)section; 627 if(_messageAnnotations !is null) 628 { 629 _messageAnnotations = cast(MessageAnnotations) section; 630 631 if(buffer.hasRemaining()) 632 { 633 section = cast(Section) decoder.readObject(); 634 } 635 else 636 { 637 section = null; 638 } 639 640 } 641 642 _properties = cast(Properties)section; 643 if(_properties !is null) 644 { 645 _properties = cast(Properties) section; 646 647 if(buffer.hasRemaining()) 648 { 649 section = cast(Section) decoder.readObject(); 650 } 651 else 652 { 653 section = null; 654 } 655 656 } 657 658 _applicationProperties = cast(ApplicationProperties)section; 659 if(_applicationProperties !is null) 660 { 661 _applicationProperties = cast(ApplicationProperties) section; 662 663 if(buffer.hasRemaining()) 664 { 665 section = cast(Section) decoder.readObject(); 666 } 667 else 668 { 669 section = null; 670 } 671 672 } 673 674 675 676 if(section !is null && (cast(Footer)section is null)) 677 { 678 _body = section; 679 680 if(buffer.hasRemaining()) 681 { 682 section = cast(Section) decoder.readObject(); 683 } 684 else 685 { 686 section = null; 687 } 688 689 } 690 691 _footer = cast(Footer)section; 692 if(_footer is null) 693 { 694 _footer = null; 695 696 } 697 698 decoder.setBuffer(null); 699 } 700 701 702 public int encode(byte[] data, int offset, int length) 703 { 704 ByteBuffer buffer = BufferUtils.toBuffer(data, offset, length); 705 return encode(new ByteBufferWrapper(buffer)); 706 } 707 708 709 public int encode2(byte[] data, int offset, int length) 710 { 711 ByteBuffer buffer = BufferUtils.toBuffer(data, offset, length); 712 ByteBufferWrapper first = new ByteBufferWrapper(buffer); 713 DroppingWritableBuffer second = new DroppingWritableBuffer(); 714 CompositeWritableBuffer composite = new CompositeWritableBuffer(first, second); 715 int start = composite.position(); 716 encode(composite); 717 return composite.position() - start; 718 } 719 720 721 public int encode(WritableBuffer buffer) 722 { 723 int length = buffer.remaining(); 724 EncoderImpl encoder = tlsCodec.encoder; 725 encoder.setByteBuffer(buffer); 726 727 if(getHeader() !is null) 728 { 729 encoder.writeObject(getHeader()); 730 } 731 if(getDeliveryAnnotations() !is null) 732 { 733 encoder.writeObject(getDeliveryAnnotations()); 734 } 735 if(getMessageAnnotations() !is null) 736 { 737 encoder.writeObject(getMessageAnnotations()); 738 } 739 if(getProperties() !is null) 740 { 741 encoder.writeObject(getProperties()); 742 } 743 if(getApplicationProperties() !is null) 744 { 745 encoder.writeObject(getApplicationProperties()); 746 } 747 if(getBody() !is null) 748 { 749 encoder.writeObject(cast(Object)getBody()); 750 } 751 if(getFooter() !is null) 752 { 753 encoder.writeObject(getFooter()); 754 } 755 encoder.setByteBuffer(cast(WritableBuffer)null); 756 757 return length - buffer.remaining(); 758 } 759 760 761 public void clear() 762 { 763 _body = null; 764 } 765 766 767 public MessageError getError() 768 { 769 return MessageError.OK; 770 } 771 772 //public String toString() 773 //{ 774 // StringBuilder sb = new StringBuilder(); 775 // sb.append("Message{"); 776 // if (_header !is null) { 777 // sb.append("header="); 778 // sb.append(_header); 779 // } 780 // if (_properties !is null) { 781 // sb.append("properties="); 782 // sb.append(_properties); 783 // } 784 // if (_messageAnnotations !is null) { 785 // sb.append("message_annotations="); 786 // sb.append(_messageAnnotations); 787 // } 788 // if (_body !is null) { 789 // sb.append("body="); 790 // sb.append(_body); 791 // } 792 // sb.append("}"); 793 // return sb.toString(); 794 //} 795 796 } 797