1 module hunt.proton.message.impl.MessageImpl;
2 
3 import std.stdio;
4 
5 import hunt.io.ByteBuffer;
6 import hunt.time.LocalDateTime;
7 import hunt.proton.amqp.Binary;
8 import hunt.proton.amqp.Symbol;
9 import hunt.proton.amqp.UnsignedByte;
10 import hunt.proton.amqp.UnsignedInteger;
11 import hunt.proton.amqp.messaging.Header;
12 import hunt.proton.amqp.messaging.DeliveryAnnotations;
13 import hunt.proton.amqp.messaging.MessageAnnotations;
14 import hunt.proton.amqp.messaging.Properties;
15 import hunt.proton.amqp.messaging.ApplicationProperties;
16 import hunt.proton.amqp.messaging.Section;
17 import hunt.proton.amqp.messaging.Footer;
18 import hunt.proton.codec.DecoderImpl;
19 import hunt.proton.codec.EncoderImpl;
20 import hunt.proton.message.Message;
21 import hunt.proton.message.ProtonJMessage;
22 import hunt.proton.message.MessageError;
23 import hunt.proton.codec.AMQPDefinedTypes;
24 import hunt.io.BufferUtils;
25 import hunt.proton.codec.ReadableBuffer;
26 import hunt.proton.codec.DroppingWritableBuffer;
27 import hunt.proton.codec.WritableBuffer;
28 import hunt.proton.codec.CompositeWritableBuffer;
29 
30 import hunt.Boolean;
31 import hunt.String;
32 import hunt.logging;
33 
34 
35 
36 class EncoderDecoderPair {
37   DecoderImpl decoder;
38   EncoderImpl encoder;
39   this()
40   {
41       decoder = new DecoderImpl();
42       encoder = new EncoderImpl(decoder);
43       AMQPDefinedTypes.registerAllTypes(decoder, encoder);
44   }
45 }
46 
47 EncoderDecoderPair tlsCodec;
48 
49 
50 static this()
51 {
52     tlsCodec = new EncoderDecoderPair();
53 }
54 
55 
56 class MessageImpl : ProtonJMessage
57 {
58     private Header _header;
59     private DeliveryAnnotations _deliveryAnnotations;
60     private MessageAnnotations _messageAnnotations;
61     private Properties _properties;
62     private ApplicationProperties _applicationProperties;
63     private Section _body;
64     private Footer _footer;
65 
66 
67 
68     //private static  ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() {
69     //       protected EncoderDecoderPair initialValue() {
70     //        return new EncoderDecoderPair();
71     //      }
72     //  };
73 
74     /**
75      * Application code should use {@link hunt.proton.message.Message.Factory#create()} instead.
76      */
77     this()
78     {
79     }
80 
81     /**
82      * Application code should instead use
83      * {@link hunt.proton.message.Message.Factory#create(Header, DeliveryAnnotations, MessageAnnotations, Properties, ApplicationProperties, Section, Footer)}
84      */
85     this(Header header, DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations,
86                        Properties properties, ApplicationProperties applicationProperties, Section bd, Footer footer)
87     {
88         _header = header;
89         _deliveryAnnotations = deliveryAnnotations;
90         _messageAnnotations = messageAnnotations;
91         _properties = properties;
92         _applicationProperties = applicationProperties;
93         _body = bd;
94         _footer = footer;
95     }
96 
97     public bool isDurable()
98     {
99         return (_header is null || _header.getDurable() is null) ? false : _header.getDurable().booleanValue;
100     }
101 
102 
103     public long getDeliveryCount()
104     {
105         return (_header is null || _header.getDeliveryCount() is null) ? 0 : _header.getDeliveryCount().longValue();
106     }
107 
108 
109     public short getPriority()
110     {
111         return (_header is null || _header.getPriority() is null)
112         ? DEFAULT_PRIORITY
113         : _header.getPriority().shortValue();
114     }
115 
116     public bool isFirstAcquirer()
117     {
118         return (_header is null || _header.getFirstAcquirer() is null) ? false : _header.getFirstAcquirer().booleanValue;
119     }
120 
121     public long getTtl()
122     {
123         return (_header is null || _header.getTtl() is null) ? 0 : _header.getTtl().longValue();
124     }
125 
126     public void setDurable(bool durable)
127     {
128         if (_header is null)
129         {
130             if (durable)
131             {
132                 _header = new Header();
133             }
134             else
135             {
136                 return;
137             }
138         }
139         _header.setDurable(new Boolean(durable));
140     }
141 
142     public void setTtl(long ttl)
143     {
144 
145         if (_header is null)
146         {
147             if (ttl != 0)
148             {
149                 _header = new Header();
150             }
151             else
152             {
153                 return;
154             }
155         }
156         _header.setTtl(UnsignedInteger.valueOf(ttl));
157     }
158 
159     public void setDeliveryCount(long deliveryCount)
160     {
161         if (_header is null)
162         {
163             if (deliveryCount == 0)
164             {
165                 return;
166             }
167             _header = new Header();
168         }
169         _header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount));
170     }
171 
172 
173     public void setFirstAcquirer(bool firstAcquirer)
174     {
175 
176         if (_header is null)
177         {
178             if (!firstAcquirer)
179             {
180                 return;
181             }
182             _header = new Header();
183         }
184         _header.setFirstAcquirer(new Boolean(firstAcquirer));
185     }
186 
187     public void setPriority(short priority)
188     {
189 
190         if (_header is null)
191         {
192             if (priority == DEFAULT_PRIORITY)
193             {
194                 return;
195             }
196             _header = new Header();
197         }
198         _header.setPriority(UnsignedByte.valueOf(cast(byte) priority));
199     }
200 
201     public Object getMessageId()
202     {
203         return _properties is null ? null : _properties.getMessageId();
204     }
205 
206     public long getGroupSequence()
207     {
208         return (_properties is null || _properties.getGroupSequence() is null) ? 0 : _properties.getGroupSequence().intValue();
209     }
210 
211     public String getReplyToGroupId()
212     {
213         return _properties is null ? null : _properties.getReplyToGroupId();
214     }
215 
216     public long getCreationTime()
217     {
218         return (_properties is null || _properties.getCreationTime() is null) ? 0 : _properties.getCreationTime().toEpochMilli();
219     }
220 
221     public String getAddress()
222     {
223         return _properties is null ? null : _properties.getTo();
224     }
225 
226     public byte[] getUserId()
227     {
228         if(_properties is null || _properties.getUserId() is null)
229         {
230             return null;
231         }
232         else
233         {
234              Binary userId = _properties.getUserId();
235             byte[] id = new byte[userId.getLength()];
236            // System.arraycopy(userId.getArray(),userId.getArrayOffset(),id,0,userId.getLength());
237             id[0 .. userId.getLength()] = userId.getArray()[userId.getArrayOffset() .. userId.getArrayOffset() + userId.getLength()];
238             return id;
239         }
240 
241     }
242 
243     public String getReplyTo()
244     {
245         return _properties is null ? null : _properties.getReplyTo();
246     }
247 
248     public String getGroupId()
249     {
250         return _properties is null ? null : _properties.getGroupId();
251     }
252 
253     public String getContentType()
254     {
255         return (_properties is null || _properties.getContentType() is null) ? null : new String(_properties.getContentType().toString());
256     }
257 
258     public long getExpiryTime()
259     {
260         return (_properties is null || _properties.getAbsoluteExpiryTime() is null) ? 0 : _properties.getAbsoluteExpiryTime().toEpochMilli();
261     }
262 
263     public Object getCorrelationId()
264     {
265         return (_properties is null) ? null : _properties.getCorrelationId();
266     }
267 
268     public String getContentEncoding()
269     {
270         return (_properties is null || _properties.getContentEncoding() is null) ? null : new String (_properties.getContentEncoding().toString());
271     }
272 
273     public String getSubject()
274     {
275         return _properties is null ? null : _properties.getSubject();
276     }
277 
278     public void setGroupSequence(long groupSequence)
279     {
280         if(_properties is null)
281         {
282             if(groupSequence == 0)
283             {
284                 return;
285             }
286             else
287             {
288                 _properties = new Properties();
289             }
290         }
291         _properties.setGroupSequence(UnsignedInteger.valueOf(cast(int) groupSequence));
292     }
293 
294     public void setUserId(byte[] userId)
295     {
296         if(userId is null)
297         {
298             if(_properties !is null)
299             {
300                 _properties.setUserId(null);
301             }
302 
303         }
304         else
305         {
306             if(_properties is null)
307             {
308                 _properties = new Properties();
309             }
310             byte[] id = new byte[userId.length];
311           //  System.arraycopy(userId, 0, id,0, userId.length);
312             id[0 .. userId.length] = userId[0 .. userId.length];
313             _properties.setUserId(new Binary(id));
314         }
315     }
316 
317     
318     public void setCreationTime(long creationTime)
319     {
320         if(_properties is null)
321         {
322             if(creationTime == 0)
323             {
324                 return;
325             }
326             _properties = new Properties();
327 
328         }
329         _properties.setCreationTime(Date.ofEpochMilli(creationTime));
330     }
331 
332     
333     public void setSubject(String subject)
334     {
335         if(_properties is null)
336         {
337             if(subject is null)
338             {
339                 return;
340             }
341             _properties = new Properties();
342         }
343         _properties.setSubject(subject);
344     }
345 
346     
347     public void setGroupId(String groupId)
348     {
349         if(_properties is null)
350         {
351             if(groupId is null)
352             {
353                 return;
354             }
355             _properties = new Properties();
356         }
357         _properties.setGroupId(groupId);
358     }
359 
360     
361     public void setAddress(String to)
362     {
363         if(_properties is null)
364         {
365             if(to is null)
366             {
367                 return;
368             }
369             _properties = new Properties();
370         }
371         _properties.setTo(to);
372     }
373 
374     
375     public void setExpiryTime(long absoluteExpiryTime)
376     {
377         if(_properties is null)
378         {
379             if(absoluteExpiryTime == 0)
380             {
381                 return;
382             }
383             _properties = new Properties();
384 
385         }
386         _properties.setAbsoluteExpiryTime(Date.ofEpochMilli(absoluteExpiryTime));
387     }
388 
389     
390     public void setReplyToGroupId(String replyToGroupId)
391     {
392         if(_properties is null)
393         {
394             if(replyToGroupId is null)
395             {
396                 return;
397             }
398             _properties = new Properties();
399         }
400         _properties.setReplyToGroupId(replyToGroupId);
401     }
402 
403     
404     public void setContentEncoding(String contentEncoding)
405     {
406         if(_properties is null)
407         {
408             if(contentEncoding is null)
409             {
410                 return;
411             }
412             _properties = new Properties();
413         }
414         _properties.setContentEncoding(Symbol.valueOf(cast(string)(contentEncoding.getBytes())));
415     }
416 
417     
418     public void setContentType(String contentType)
419     {
420         if(_properties is null)
421         {
422             if(contentType is null)
423             {
424                 return;
425             }
426             _properties = new Properties();
427         }
428         _properties.setContentType(Symbol.valueOf(cast(string)(contentType.getBytes())));
429     }
430 
431     
432     public void setReplyTo(String replyTo)
433     {
434 
435         if(_properties is null)
436         {
437             if(replyTo is null)
438             {
439                 return;
440             }
441             _properties = new Properties();
442         }
443         _properties.setReplyTo(replyTo);
444     }
445 
446     
447     public void setCorrelationId(String correlationId)
448     {
449 
450         if(_properties is null)
451         {
452             if(correlationId is null)
453             {
454                 return;
455             }
456             _properties = new Properties();
457         }
458         _properties.setCorrelationId(correlationId);
459     }
460 
461     
462     public void setMessageId(String messageId)
463     {
464 
465         if(_properties is null)
466         {
467             if(messageId is null)
468             {
469                 return;
470             }
471             _properties = new Properties();
472         }
473         _properties.setMessageId(messageId);
474     }
475 
476 
477     
478     public Header getHeader()
479     {
480         return _header;
481     }
482 
483     
484     public DeliveryAnnotations getDeliveryAnnotations()
485     {
486         return _deliveryAnnotations;
487     }
488 
489     
490     public MessageAnnotations getMessageAnnotations()
491     {
492         return _messageAnnotations;
493     }
494 
495     
496     public Properties getProperties()
497     {
498         return _properties;
499     }
500 
501     
502     public ApplicationProperties getApplicationProperties()
503     {
504         return _applicationProperties;
505     }
506 
507     
508     public Section getBody()
509     {
510         return _body;
511     }
512 
513     
514     public Footer getFooter()
515     {
516         return _footer;
517     }
518 
519     
520     public void setHeader(Header header)
521     {
522         _header = header;
523     }
524 
525     
526     public void setDeliveryAnnotations(DeliveryAnnotations deliveryAnnotations)
527     {
528         _deliveryAnnotations = deliveryAnnotations;
529     }
530 
531     
532     public void setMessageAnnotations(MessageAnnotations messageAnnotations)
533     {
534         _messageAnnotations = messageAnnotations;
535     }
536 
537     
538     public void setProperties(Properties properties)
539     {
540         _properties = properties;
541     }
542 
543     
544     public void setApplicationProperties(ApplicationProperties applicationProperties)
545     {
546         _applicationProperties = applicationProperties;
547     }
548 
549     
550     public void setBody(Section body)
551     {
552         _body = body;
553     }
554 
555     
556     public void setFooter(Footer footer)
557     {
558         _footer = footer;
559     }
560 
561     
562     public int decode(byte[] data, int offset, int length)
563     {
564          ByteBuffer buffer = BufferUtils.toBuffer(data, offset, length);
565         decode(buffer);
566 
567         return length-buffer.remaining();
568     }
569 
570     public void decode(ByteBuffer buffer)
571     {
572         decode(ByteBufferReader.wrap(buffer));
573     }
574 
575     public void decode(ReadableBuffer buffer)
576     {
577         DecoderImpl decoder = tlsCodec.decoder;
578         decoder.setBuffer(buffer);
579 
580         _header = null;
581         _deliveryAnnotations = null;
582         _messageAnnotations = null;
583         _properties = null;
584         _applicationProperties = null;
585         _body = null;
586         _footer = null;
587         Section section = null;
588 
589         if(buffer.hasRemaining())
590         {
591             section = cast(Section) decoder.readObject();
592         }
593 
594 
595         _header  = cast(Header)section;
596         if(_header !is null)
597         {
598             //_header = (Header) section;
599             if(buffer.hasRemaining())
600             {
601                 section = cast(Section) decoder.readObject();
602             }
603             else
604             {
605                 section = null;
606             }
607 
608         }
609 
610         _deliveryAnnotations = cast(DeliveryAnnotations)section;
611         if(_deliveryAnnotations !is null)
612         {
613             _deliveryAnnotations = cast(DeliveryAnnotations) section;
614 
615             if(buffer.hasRemaining())
616             {
617                 section = cast(Section) decoder.readObject();
618             }
619             else
620             {
621                 section = null;
622             }
623 
624         }
625 
626         _messageAnnotations = cast(MessageAnnotations)section;
627         if(_messageAnnotations !is null)
628         {
629             _messageAnnotations = cast(MessageAnnotations) section;
630 
631             if(buffer.hasRemaining())
632             {
633                 section = cast(Section) decoder.readObject();
634             }
635             else
636             {
637                 section = null;
638             }
639 
640         }
641 
642         _properties = cast(Properties)section;
643         if(_properties !is null)
644         {
645             _properties = cast(Properties) section;
646 
647             if(buffer.hasRemaining())
648             {
649                 section = cast(Section) decoder.readObject();
650             }
651             else
652             {
653                 section = null;
654             }
655 
656         }
657 
658         _applicationProperties = cast(ApplicationProperties)section;
659         if(_applicationProperties !is null)
660         {
661             _applicationProperties = cast(ApplicationProperties) section;
662 
663             if(buffer.hasRemaining())
664             {
665                 section = cast(Section) decoder.readObject();
666             }
667             else
668             {
669                 section = null;
670             }
671 
672         }
673 
674 
675 
676         if(section !is null && (cast(Footer)section is null))
677         {
678             _body = section;
679 
680             if(buffer.hasRemaining())
681             {
682                 section = cast(Section) decoder.readObject();
683             }
684             else
685             {
686                 section = null;
687             }
688 
689         }
690 
691         _footer = cast(Footer)section;
692         if(_footer is null)
693         {
694             _footer = null;
695 
696         }
697 
698         decoder.setBuffer(null);
699     }
700 
701     
702     public int encode(byte[] data, int offset, int length)
703     {
704         ByteBuffer buffer = BufferUtils.toBuffer(data, offset, length);
705         return encode(new ByteBufferWrapper(buffer));
706     }
707 
708     
709     public int encode2(byte[] data, int offset, int length)
710     {
711         ByteBuffer buffer = BufferUtils.toBuffer(data, offset, length);
712         ByteBufferWrapper first = new ByteBufferWrapper(buffer);
713         DroppingWritableBuffer second = new DroppingWritableBuffer();
714         CompositeWritableBuffer composite = new CompositeWritableBuffer(first, second);
715         int start = composite.position();
716         encode(composite);
717         return composite.position() - start;
718     }
719 
720     
721     public int encode(WritableBuffer buffer)
722     {
723         int length = buffer.remaining();
724         EncoderImpl encoder = tlsCodec.encoder;
725         encoder.setByteBuffer(buffer);
726 
727         if(getHeader() !is null)
728         {
729             encoder.writeObject(getHeader());
730         }
731         if(getDeliveryAnnotations() !is null)
732         {
733             encoder.writeObject(getDeliveryAnnotations());
734         }
735         if(getMessageAnnotations() !is null)
736         {
737             encoder.writeObject(getMessageAnnotations());
738         }
739         if(getProperties() !is null)
740         {
741             encoder.writeObject(getProperties());
742         }
743         if(getApplicationProperties() !is null)
744         {
745             encoder.writeObject(getApplicationProperties());
746         }
747         if(getBody() !is null)
748         {
749             encoder.writeObject(cast(Object)getBody());
750         }
751         if(getFooter() !is null)
752         {
753             encoder.writeObject(getFooter());
754         }
755         encoder.setByteBuffer(cast(WritableBuffer)null);
756 
757         return length - buffer.remaining();
758     }
759 
760     
761     public void clear()
762     {
763         _body = null;
764     }
765 
766     
767     public MessageError getError()
768     {
769         return MessageError.OK;
770     }
771 
772     //public String toString()
773     //{
774     //    StringBuilder sb = new StringBuilder();
775     //    sb.append("Message{");
776     //    if (_header !is null) {
777     //        sb.append("header=");
778     //        sb.append(_header);
779     //    }
780     //    if (_properties !is null) {
781     //        sb.append("properties=");
782     //        sb.append(_properties);
783     //    }
784     //    if (_messageAnnotations !is null) {
785     //        sb.append("message_annotations=");
786     //        sb.append(_messageAnnotations);
787     //    }
788     //    if (_body !is null) {
789     //        sb.append("body=");
790     //        sb.append(_body);
791     //    }
792     //    sb.append("}");
793     //    return sb.toString();
794     //}
795 
796 }
797