1 module hunt.proton.message.impl.MessageImpl;
2
3 import std.stdio;
4
5 import hunt.io.ByteBuffer;
6 import hunt.time.LocalDateTime;
7 import hunt.proton.amqp.Binary;
8 import hunt.proton.amqp.Symbol;
9 import hunt.proton.amqp.UnsignedByte;
10 import hunt.proton.amqp.UnsignedInteger;
11 import hunt.proton.amqp.messaging.Header;
12 import hunt.proton.amqp.messaging.DeliveryAnnotations;
13 import hunt.proton.amqp.messaging.MessageAnnotations;
14 import hunt.proton.amqp.messaging.Properties;
15 import hunt.proton.amqp.messaging.ApplicationProperties;
16 import hunt.proton.amqp.messaging.Section;
17 import hunt.proton.amqp.messaging.Footer;
18 import hunt.proton.codec.DecoderImpl;
19 import hunt.proton.codec.EncoderImpl;
20 import hunt.proton.message.Message;
21 import hunt.proton.message.ProtonJMessage;
22 import hunt.proton.message.MessageError;
23 import hunt.proton.codec.AMQPDefinedTypes;
24 import hunt.io.BufferUtils;
25 import hunt.proton.codec.ReadableBuffer;
26 import hunt.proton.codec.DroppingWritableBuffer;
27 import hunt.proton.codec.WritableBuffer;
28 import hunt.proton.codec.CompositeWritableBuffer;
29
30 import hunt.Boolean;
31 import hunt.String;
32 import hunt.logging;
33
34
35
36 class EncoderDecoderPair {
37 DecoderImpl decoder;
38 EncoderImpl encoder;
39 this()
40 {
41 decoder = new DecoderImpl();
42 encoder = new EncoderImpl(decoder);
43 AMQPDefinedTypes.registerAllTypes(decoder, encoder);
44 }
45 }
46
47 EncoderDecoderPair tlsCodec;
48
49
50 static this()
51 {
52 tlsCodec = new EncoderDecoderPair();
53 }
54
55
56 class MessageImpl : ProtonJMessage
57 {
58 private Header _header;
59 private DeliveryAnnotations _deliveryAnnotations;
60 private MessageAnnotations _messageAnnotations;
61 private Properties _properties;
62 private ApplicationProperties _applicationProperties;
63 private Section _body;
64 private Footer _footer;
65
66
67
68 //private static ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() {
69 // protected EncoderDecoderPair initialValue() {
70 // return new EncoderDecoderPair();
71 // }
72 // };
73
74 /**
75 * Application code should use {@link hunt.proton.message.Message.Factory#create()} instead.
76 */
77 this()
78 {
79 }
80
81 /**
82 * Application code should instead use
83 * {@link hunt.proton.message.Message.Factory#create(Header, DeliveryAnnotations, MessageAnnotations, Properties, ApplicationProperties, Section, Footer)}
84 */
85 this(Header header, DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations,
86 Properties properties, ApplicationProperties applicationProperties, Section bd, Footer footer)
87 {
88 _header = header;
89 _deliveryAnnotations = deliveryAnnotations;
90 _messageAnnotations = messageAnnotations;
91 _properties = properties;
92 _applicationProperties = applicationProperties;
93 _body = bd;
94 _footer = footer;
95 }
96
97 public bool isDurable()
98 {
99 return (_header is null || _header.getDurable() is null) ? false : _header.getDurable().booleanValue;
100 }
101
102
103 public long getDeliveryCount()
104 {
105 return (_header is null || _header.getDeliveryCount() is null) ? 0 : _header.getDeliveryCount().longValue();
106 }
107
108
109 public short getPriority()
110 {
111 return (_header is null || _header.getPriority() is null)
112 ? DEFAULT_PRIORITY
113 : _header.getPriority().shortValue();
114 }
115
116 public bool isFirstAcquirer()
117 {
118 return (_header is null || _header.getFirstAcquirer() is null) ? false : _header.getFirstAcquirer().booleanValue;
119 }
120
121 public long getTtl()
122 {
123 return (_header is null || _header.getTtl() is null) ? 0 : _header.getTtl().longValue();
124 }
125
126 public void setDurable(bool durable)
127 {
128 if (_header is null)
129 {
130 if (durable)
131 {
132 _header = new Header();
133 }
134 else
135 {
136 return;
137 }
138 }
139 _header.setDurable(new Boolean(durable));
140 }
141
142 public void setTtl(long ttl)
143 {
144
145 if (_header is null)
146 {
147 if (ttl != 0)
148 {
149 _header = new Header();
150 }
151 else
152 {
153 return;
154 }
155 }
156 _header.setTtl(UnsignedInteger.valueOf(ttl));
157 }
158
159 public void setDeliveryCount(long deliveryCount)
160 {
161 if (_header is null)
162 {
163 if (deliveryCount == 0)
164 {
165 return;
166 }
167 _header = new Header();
168 }
169 _header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount));
170 }
171
172
173 public void setFirstAcquirer(bool firstAcquirer)
174 {
175
176 if (_header is null)
177 {
178 if (!firstAcquirer)
179 {
180 return;
181 }
182 _header = new Header();
183 }
184 _header.setFirstAcquirer(new Boolean(firstAcquirer));
185 }
186
187 public void setPriority(short priority)
188 {
189
190 if (_header is null)
191 {
192 if (priority == DEFAULT_PRIORITY)
193 {
194 return;
195 }
196 _header = new Header();
197 }
198 _header.setPriority(UnsignedByte.valueOf(cast(byte) priority));
199 }
200
201 public Object getMessageId()
202 {
203 return _properties is null ? null : _properties.getMessageId();
204 }
205
206 public long getGroupSequence()
207 {
208 return (_properties is null || _properties.getGroupSequence() is null) ? 0 : _properties.getGroupSequence().intValue();
209 }
210
211 public String getReplyToGroupId()
212 {
213 return _properties is null ? null : _properties.getReplyToGroupId();
214 }
215
216 public long getCreationTime()
217 {
218 return (_properties is null || _properties.getCreationTime() is null) ? 0 : _properties.getCreationTime().toEpochMilli();
219 }
220
221 public String getAddress()
222 {
223 return _properties is null ? null : _properties.getTo();
224 }
225
226 public byte[] getUserId()
227 {
228 if(_properties is null || _properties.getUserId() is null)
229 {
230 return null;
231 }
232 else
233 {
234 Binary userId = _properties.getUserId();
235 byte[] id = new byte[userId.getLength()];
236 // System.arraycopy(userId.getArray(),userId.getArrayOffset(),id,0,userId.getLength());
237 id[0 .. userId.getLength()] = userId.getArray()[userId.getArrayOffset() .. userId.getArrayOffset() + userId.getLength()];
238 return id;
239 }
240
241 }
242
243 public String getReplyTo()
244 {
245 return _properties is null ? null : _properties.getReplyTo();
246 }
247
248 public String getGroupId()
249 {
250 return _properties is null ? null : _properties.getGroupId();
251 }
252
253 public String getContentType()
254 {
255 return (_properties is null || _properties.getContentType() is null) ? null : new String(_properties.getContentType().toString());
256 }
257
258 public long getExpiryTime()
259 {
260 return (_properties is null || _properties.getAbsoluteExpiryTime() is null) ? 0 : _properties.getAbsoluteExpiryTime().toEpochMilli();
261 }
262
263 public Object getCorrelationId()
264 {
265 return (_properties is null) ? null : _properties.getCorrelationId();
266 }
267
268 public String getContentEncoding()
269 {
270 return (_properties is null || _properties.getContentEncoding() is null) ? null : new String (_properties.getContentEncoding().toString());
271 }
272
273 public String getSubject()
274 {
275 return _properties is null ? null : _properties.getSubject();
276 }
277
278 public void setGroupSequence(long groupSequence)
279 {
280 if(_properties is null)
281 {
282 if(groupSequence == 0)
283 {
284 return;
285 }
286 else
287 {
288 _properties = new Properties();
289 }
290 }
291 _properties.setGroupSequence(UnsignedInteger.valueOf(cast(int) groupSequence));
292 }
293
294 public void setUserId(byte[] userId)
295 {
296 if(userId is null)
297 {
298 if(_properties !is null)
299 {
300 _properties.setUserId(null);
301 }
302
303 }
304 else
305 {
306 if(_properties is null)
307 {
308 _properties = new Properties();
309 }
310 byte[] id = new byte[userId.length];
311 // System.arraycopy(userId, 0, id,0, userId.length);
312 id[0 .. userId.length] = userId[0 .. userId.length];
313 _properties.setUserId(new Binary(id));
314 }
315 }
316
317
318 public void setCreationTime(long creationTime)
319 {
320 if(_properties is null)
321 {
322 if(creationTime == 0)
323 {
324 return;
325 }
326 _properties = new Properties();
327
328 }
329 _properties.setCreationTime(Date.ofEpochMilli(creationTime));
330 }
331
332
333 public void setSubject(String subject)
334 {
335 if(_properties is null)
336 {
337 if(subject is null)
338 {
339 return;
340 }
341 _properties = new Properties();
342 }
343 _properties.setSubject(subject);
344 }
345
346
347 public void setGroupId(String groupId)
348 {
349 if(_properties is null)
350 {
351 if(groupId is null)
352 {
353 return;
354 }
355 _properties = new Properties();
356 }
357 _properties.setGroupId(groupId);
358 }
359
360
361 public void setAddress(String to)
362 {
363 if(_properties is null)
364 {
365 if(to is null)
366 {
367 return;
368 }
369 _properties = new Properties();
370 }
371 _properties.setTo(to);
372 }
373
374
375 public void setExpiryTime(long absoluteExpiryTime)
376 {
377 if(_properties is null)
378 {
379 if(absoluteExpiryTime == 0)
380 {
381 return;
382 }
383 _properties = new Properties();
384
385 }
386 _properties.setAbsoluteExpiryTime(Date.ofEpochMilli(absoluteExpiryTime));
387 }
388
389
390 public void setReplyToGroupId(String replyToGroupId)
391 {
392 if(_properties is null)
393 {
394 if(replyToGroupId is null)
395 {
396 return;
397 }
398 _properties = new Properties();
399 }
400 _properties.setReplyToGroupId(replyToGroupId);
401 }
402
403
404 public void setContentEncoding(String contentEncoding)
405 {
406 if(_properties is null)
407 {
408 if(contentEncoding is null)
409 {
410 return;
411 }
412 _properties = new Properties();
413 }
414 _properties.setContentEncoding(Symbol.valueOf(cast(string)(contentEncoding.getBytes())));
415 }
416
417
418 public void setContentType(String contentType)
419 {
420 if(_properties is null)
421 {
422 if(contentType is null)
423 {
424 return;
425 }
426 _properties = new Properties();
427 }
428 _properties.setContentType(Symbol.valueOf(cast(string)(contentType.getBytes())));
429 }
430
431
432 public void setReplyTo(String replyTo)
433 {
434
435 if(_properties is null)
436 {
437 if(replyTo is null)
438 {
439 return;
440 }
441 _properties = new Properties();
442 }
443 _properties.setReplyTo(replyTo);
444 }
445
446
447 public void setCorrelationId(String correlationId)
448 {
449
450 if(_properties is null)
451 {
452 if(correlationId is null)
453 {
454 return;
455 }
456 _properties = new Properties();
457 }
458 _properties.setCorrelationId(correlationId);
459 }
460
461
462 public void setMessageId(String messageId)
463 {
464
465 if(_properties is null)
466 {
467 if(messageId is null)
468 {
469 return;
470 }
471 _properties = new Properties();
472 }
473 _properties.setMessageId(messageId);
474 }
475
476
477
478 public Header getHeader()
479 {
480 return _header;
481 }
482
483
484 public DeliveryAnnotations getDeliveryAnnotations()
485 {
486 return _deliveryAnnotations;
487 }
488
489
490 public MessageAnnotations getMessageAnnotations()
491 {
492 return _messageAnnotations;
493 }
494
495
496 public Properties getProperties()
497 {
498 return _properties;
499 }
500
501
502 public ApplicationProperties getApplicationProperties()
503 {
504 return _applicationProperties;
505 }
506
507
508 public Section getBody()
509 {
510 return _body;
511 }
512
513
514 public Footer getFooter()
515 {
516 return _footer;
517 }
518
519
520 public void setHeader(Header header)
521 {
522 _header = header;
523 }
524
525
526 public void setDeliveryAnnotations(DeliveryAnnotations deliveryAnnotations)
527 {
528 _deliveryAnnotations = deliveryAnnotations;
529 }
530
531
532 public void setMessageAnnotations(MessageAnnotations messageAnnotations)
533 {
534 _messageAnnotations = messageAnnotations;
535 }
536
537
538 public void setProperties(Properties properties)
539 {
540 _properties = properties;
541 }
542
543
544 public void setApplicationProperties(ApplicationProperties applicationProperties)
545 {
546 _applicationProperties = applicationProperties;
547 }
548
549
550 public void setBody(Section body)
551 {
552 _body = body;
553 }
554
555
556 public void setFooter(Footer footer)
557 {
558 _footer = footer;
559 }
560
561
562 public int decode(byte[] data, int offset, int length)
563 {
564 ByteBuffer buffer = BufferUtils.toBuffer(data, offset, length);
565 decode(buffer);
566
567 return length-buffer.remaining();
568 }
569
570 public void decode(ByteBuffer buffer)
571 {
572 decode(ByteBufferReader.wrap(buffer));
573 }
574
575 public void decode(ReadableBuffer buffer)
576 {
577 DecoderImpl decoder = tlsCodec.decoder;
578 decoder.setBuffer(buffer);
579
580 _header = null;
581 _deliveryAnnotations = null;
582 _messageAnnotations = null;
583 _properties = null;
584 _applicationProperties = null;
585 _body = null;
586 _footer = null;
587 Section section = null;
588
589 if(buffer.hasRemaining())
590 {
591 section = cast(Section) decoder.readObject();
592 }
593
594
595 _header = cast(Header)section;
596 if(_header !is null)
597 {
598 //_header = (Header) section;
599 if(buffer.hasRemaining())
600 {
601 section = cast(Section) decoder.readObject();
602 }
603 else
604 {
605 section = null;
606 }
607
608 }
609
610 _deliveryAnnotations = cast(DeliveryAnnotations)section;
611 if(_deliveryAnnotations !is null)
612 {
613 _deliveryAnnotations = cast(DeliveryAnnotations) section;
614
615 if(buffer.hasRemaining())
616 {
617 section = cast(Section) decoder.readObject();
618 }
619 else
620 {
621 section = null;
622 }
623
624 }
625
626 _messageAnnotations = cast(MessageAnnotations)section;
627 if(_messageAnnotations !is null)
628 {
629 _messageAnnotations = cast(MessageAnnotations) section;
630
631 if(buffer.hasRemaining())
632 {
633 section = cast(Section) decoder.readObject();
634 }
635 else
636 {
637 section = null;
638 }
639
640 }
641
642 _properties = cast(Properties)section;
643 if(_properties !is null)
644 {
645 _properties = cast(Properties) section;
646
647 if(buffer.hasRemaining())
648 {
649 section = cast(Section) decoder.readObject();
650 }
651 else
652 {
653 section = null;
654 }
655
656 }
657
658 _applicationProperties = cast(ApplicationProperties)section;
659 if(_applicationProperties !is null)
660 {
661 _applicationProperties = cast(ApplicationProperties) section;
662
663 if(buffer.hasRemaining())
664 {
665 section = cast(Section) decoder.readObject();
666 }
667 else
668 {
669 section = null;
670 }
671
672 }
673
674
675
676 if(section !is null && (cast(Footer)section is null))
677 {
678 _body = section;
679
680 if(buffer.hasRemaining())
681 {
682 section = cast(Section) decoder.readObject();
683 }
684 else
685 {
686 section = null;
687 }
688
689 }
690
691 _footer = cast(Footer)section;
692 if(_footer is null)
693 {
694 _footer = null;
695
696 }
697
698 decoder.setBuffer(null);
699 }
700
701
702 public int encode(byte[] data, int offset, int length)
703 {
704 ByteBuffer buffer = BufferUtils.toBuffer(data, offset, length);
705 return encode(new ByteBufferWrapper(buffer));
706 }
707
708
709 public int encode2(byte[] data, int offset, int length)
710 {
711 ByteBuffer buffer = BufferUtils.toBuffer(data, offset, length);
712 ByteBufferWrapper first = new ByteBufferWrapper(buffer);
713 DroppingWritableBuffer second = new DroppingWritableBuffer();
714 CompositeWritableBuffer composite = new CompositeWritableBuffer(first, second);
715 int start = composite.position();
716 encode(composite);
717 return composite.position() - start;
718 }
719
720
721 public int encode(WritableBuffer buffer)
722 {
723 int length = buffer.remaining();
724 EncoderImpl encoder = tlsCodec.encoder;
725 encoder.setByteBuffer(buffer);
726
727 if(getHeader() !is null)
728 {
729 encoder.writeObject(getHeader());
730 }
731 if(getDeliveryAnnotations() !is null)
732 {
733 encoder.writeObject(getDeliveryAnnotations());
734 }
735 if(getMessageAnnotations() !is null)
736 {
737 encoder.writeObject(getMessageAnnotations());
738 }
739 if(getProperties() !is null)
740 {
741 encoder.writeObject(getProperties());
742 }
743 if(getApplicationProperties() !is null)
744 {
745 encoder.writeObject(getApplicationProperties());
746 }
747 if(getBody() !is null)
748 {
749 encoder.writeObject(cast(Object)getBody());
750 }
751 if(getFooter() !is null)
752 {
753 encoder.writeObject(getFooter());
754 }
755 encoder.setByteBuffer(cast(WritableBuffer)null);
756
757 return length - buffer.remaining();
758 }
759
760
761 public void clear()
762 {
763 _body = null;
764 }
765
766
767 public MessageError getError()
768 {
769 return MessageError.OK;
770 }
771
772 //public String toString()
773 //{
774 // StringBuilder sb = new StringBuilder();
775 // sb.append("Message{");
776 // if (_header !is null) {
777 // sb.append("header=");
778 // sb.append(_header);
779 // }
780 // if (_properties !is null) {
781 // sb.append("properties=");
782 // sb.append(_properties);
783 // }
784 // if (_messageAnnotations !is null) {
785 // sb.append("message_annotations=");
786 // sb.append(_messageAnnotations);
787 // }
788 // if (_body !is null) {
789 // sb.append("body=");
790 // sb.append(_body);
791 // }
792 // sb.append("}");
793 // return sb.toString();
794 //}
795
796 }
797