1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.proton.codec.messaging.FastPathHeaderType;
12 
13 import hunt.collection.Collection;
14 
15 import hunt.proton.amqp.Symbol;
16 import hunt.proton.amqp.UnsignedLong;
17 import hunt.proton.amqp.messaging.Header;
18 import hunt.proton.codec.AMQPType;
19 import hunt.proton.codec.DecodeException;
20 import hunt.proton.codec.Decoder;
21 import hunt.proton.codec.DecoderImpl;
22 import hunt.proton.codec.EncoderImpl;
23 import hunt.proton.codec.EncodingCodes;
24 import hunt.proton.codec.FastPathDescribedTypeConstructor;
25 import hunt.proton.codec.ReadableBuffer;
26 import hunt.proton.codec.TypeEncoding;
27 import hunt.proton.codec.WritableBuffer;
28 import hunt.Exceptions;
29 
30 import hunt.proton.codec.messaging.HeaderType;
31 import std.concurrency : initOnce;
32 import hunt.logging;
33 import std.conv : to;
34 
35 class FastPathHeaderType : AMQPType!(Header), FastPathDescribedTypeConstructor!(Header) {
36 
37     private static byte DESCRIPTOR_CODE = 0x70;
38 
39     //private static Object[] DESCRIPTORS =
40     //{
41     //    UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:header:list"),
42     //};
43 
44 
45     static Object[]  DESCRIPTORS() {
46         __gshared Object[]  inst;
47         return initOnce!inst([UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:header:list")]);
48     }
49 
50 
51     private HeaderType headerType;
52 
53     this(EncoderImpl encoder) {
54         this.headerType = new HeaderType(encoder);
55     }
56 
57     public EncoderImpl getEncoder() {
58         return headerType.getEncoder();
59     }
60 
61     public DecoderImpl getDecoder() {
62         return headerType.getDecoder();
63     }
64 
65     override
66     public Header readValue() {
67         DecoderImpl decoder = getDecoder();
68         ReadableBuffer buffer = decoder.getBuffer();
69         byte typeCode = decoder.getBuffer().get();
70 
71         int size = 0;
72         int count = 0;
73 
74         switch (typeCode) {
75             case EncodingCodes.LIST0:
76                 break;
77             case EncodingCodes.LIST8:
78                 size = buffer.get() & 0xff;
79                 count = buffer.get() & 0xff;
80                 break;
81             case EncodingCodes.LIST32:
82                 size = buffer.getInt();
83                 count = buffer.getInt();
84                 break;
85             default:
86             {
87                 logError("Incorrect type found in Header encoding: %d",typeCode);
88                 break;
89             }
90                // throw new DecodeException("Incorrect type found in Header encoding: " ~ typeCode);
91         }
92 
93         Header header = new Header();
94 
95         for (int index = 0; index < count; ++index) {
96             switch (index) {
97                 case 0:
98                     header.setDurable(decoder.readBoolean(null));
99                     break;
100                 case 1:
101                     header.setPriority(decoder.readUnsignedByte(null));
102                     break;
103                 case 2:
104                     header.setTtl(decoder.readUnsignedInteger(null));
105                     break;
106                 case 3:
107                     header.setFirstAcquirer(decoder.readBoolean(null));
108                     break;
109                 case 4:
110                     header.setDeliveryCount(decoder.readUnsignedInteger(null));
111                     break;
112                 default:
113                     throw new IllegalStateException("To many entries in Header encoding");
114             }
115         }
116 
117         return header;
118     }
119 
120     override
121     public void skipValue() {
122         //implementationMissing(false);
123        getDecoder().readConstructor().skipValue();
124     }
125 
126     override
127     public bool encodesJavaPrimitive() {
128         return false;
129     }
130 
131     override
132     public TypeInfo getTypeClass() {
133         return typeid(Header);
134     }
135 
136     override
137     public ITypeEncoding getEncoding(Object header) {
138         return headerType.getEncoding(cast(Header)header);
139     }
140 
141     override
142     public TypeEncoding!(Header) getCanonicalEncoding() {
143         return headerType.getCanonicalEncoding();
144     }
145 
146     override
147     public  Collection!(TypeEncoding!(Header)) getAllEncodings() {
148         return headerType.getAllEncodings();
149     }
150 
151     override
152     public void write(Object  v) {
153         Header value = cast(Header)v;
154         WritableBuffer buffer = getEncoder().getBuffer();
155         int count = getElementCount(value);
156         byte encodingCode = deduceEncodingCode(value, count);
157 
158         buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
159         buffer.put(EncodingCodes.SMALLULONG);
160         buffer.put(DESCRIPTOR_CODE);
161         buffer.put(encodingCode);
162 
163         // Optimized step, no other data to be written.
164         if (encodingCode == EncodingCodes.LIST0) {
165             return;
166         }
167 
168         int fieldWidth;
169 
170         if (encodingCode == EncodingCodes.LIST8) {
171             fieldWidth = 1;
172         } else {
173             fieldWidth = 4;
174         }
175 
176         int startIndex = buffer.position();
177 
178         // Reserve space for the size and write the count of list elements.
179         if (fieldWidth == 1) {
180             buffer.put(cast(byte) 0);
181             buffer.put(cast(byte) count);
182         } else {
183             buffer.putInt(0);
184             buffer.putInt(count);
185         }
186 
187         // Write the list elements and then compute total size written.
188         for (int i = 0; i < count; ++i) {
189             writeElement(value, i);
190         }
191 
192         // Move back and write the size
193         int endIndex = buffer.position();
194         int writeSize = endIndex - startIndex - fieldWidth;
195 
196         buffer.position(startIndex);
197         if (fieldWidth == 1) {
198             buffer.put(cast(byte) writeSize);
199         } else {
200             buffer.putInt(writeSize);
201         }
202         buffer.position(endIndex);
203     }
204 
205     private void writeElement(Header header, int index) {
206         switch (index) {
207             case 0:
208                 getEncoder().writeBoolean(header.getDurable());
209                 break;
210             case 1:
211                 getEncoder().writeUnsignedByte(header.getPriority());
212                 break;
213             case 2:
214                 getEncoder().writeUnsignedInteger(header.getTtl());
215                 break;
216             case 3:
217                 getEncoder().writeBoolean(header.getFirstAcquirer());
218                 break;
219             case 4:
220                 getEncoder().writeUnsignedInteger(header.getDeliveryCount());
221                 break;
222             default:
223                 throw new IllegalArgumentException("Unknown Header value index: " ~ to!string(index));
224         }
225     }
226 
227     private int getElementCount(Header header) {
228         if (header.getDeliveryCount() !is null) {
229             return 5;
230         } else if (header.getFirstAcquirer() !is null) {
231             return 4;
232         } else if (header.getTtl() !is null) {
233             return 3;
234         } else if (header.getPriority() !is null) {
235             return 2;
236         } else if (header.getDurable() !is null) {
237             return 1;
238         } else {
239             return 0;
240         }
241     }
242 
243     private byte deduceEncodingCode(Header value, int elementCount) {
244         if (elementCount == 0) {
245             return EncodingCodes.LIST0;
246         } else {
247             return EncodingCodes.LIST8;
248         }
249     }
250 
251     public static void register(Decoder decoder, EncoderImpl encoder) {
252         FastPathHeaderType type = new FastPathHeaderType(encoder);
253         //implementationMissing(false);
254         foreach(Object descriptor ; DESCRIPTORS)
255         {
256             decoder.registerFastPath(descriptor,  type);
257         }
258         encoder.register(type);
259     }
260 }