1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.proton.codec.transport.FastPathDispositionType;
12 
13 import hunt.collection.Collection;
14 
15 import hunt.proton.amqp.Symbol;
16 import hunt.proton.amqp.UnsignedLong;
17 import hunt.proton.amqp.messaging.Accepted;
18 import hunt.proton.amqp.messaging.Released;
19 import hunt.proton.amqp.transport.DeliveryState;
20 import hunt.proton.amqp.transport.Disposition;
21 import hunt.proton.amqp.transport.Role;
22 import hunt.proton.codec.AMQPType;
23 import hunt.proton.codec.DecodeException;
24 import hunt.proton.codec.Decoder;
25 import hunt.proton.codec.DecoderImpl;
26 import hunt.proton.codec.EncoderImpl;
27 import hunt.proton.codec.EncodingCodes;
28 import hunt.proton.codec.FastPathDescribedTypeConstructor;
29 import hunt.proton.codec.TypeEncoding;
30 import hunt.proton.codec.WritableBuffer;
31 import std.concurrency : initOnce;
32 import hunt.logging;
33 import hunt.proton.codec.transport.DispositionType;
34 import hunt.Boolean;
35 import hunt.Exceptions;
36 import std.conv : to;
37 
38 class FastPathDispositionType : AMQPType!(Disposition), FastPathDescribedTypeConstructor!(Disposition) {
39 
40     private static byte DESCRIPTOR_CODE = 0x15;
41     private static byte ACCEPTED_DESCRIPTOR_CODE = 0x24;
42 
43     //private static Object[] DESCRIPTORS =
44     //{
45     //    UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:disposition:list"),
46     //};
47 
48      static Object[]  DESCRIPTORS() {
49          __gshared Object[]  inst;
50          return initOnce!inst([UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:disposition:list")]);
51      }
52 
53     static byte[]  ACCEPTED_ENCODED_BYTES() {
54         __gshared byte[]  inst;
55         return initOnce!inst([ EncodingCodes.DESCRIBED_TYPE_INDICATOR,
56         EncodingCodes.SMALLULONG,
57         ACCEPTED_DESCRIPTOR_CODE,
58         EncodingCodes.LIST0]);
59     }
60 
61 
62     //private static byte[] ACCEPTED_ENCODED_BYTES = [
63     //    EncodingCodes.DESCRIBED_TYPE_INDICATOR,
64     //    EncodingCodes.SMALLULONG,
65     //    ACCEPTED_DESCRIPTOR_CODE,
66     //    EncodingCodes.LIST0
67     //];
68 
69     private DispositionType dispositionType;
70 
71     this(EncoderImpl encoder) {
72         this.dispositionType = new DispositionType(encoder);
73     }
74 
75     public EncoderImpl getEncoder() {
76         return dispositionType.getEncoder();
77     }
78 
79     public DecoderImpl getDecoder() {
80         return dispositionType.getDecoder();
81     }
82 
83     override
84     public bool encodesJavaPrimitive() {
85         return false;
86     }
87 
88     override
89     public TypeInfo getTypeClass() {
90         return typeid(Disposition);
91     }
92 
93     override
94     public ITypeEncoding getEncoding(Object disposition) {
95         return dispositionType.getEncoding(cast(Disposition)disposition);
96     }
97 
98     override
99     public TypeEncoding!(Disposition) getCanonicalEncoding() {
100         return dispositionType.getCanonicalEncoding();
101     }
102 
103     override
104     public Collection!(TypeEncoding!(Disposition)) getAllEncodings() {
105         return dispositionType.getAllEncodings();
106     }
107 
108     override
109     public Disposition readValue() {
110         DecoderImpl decoder = getDecoder();
111         byte typeCode = decoder.getBuffer().get();
112 
113         int size = 0;
114         int count = 0;
115 
116         switch (typeCode) {
117             case EncodingCodes.LIST0:
118                 // TODO - Technically invalid however old decoder also allowed this.
119                 break;
120             case EncodingCodes.LIST8:
121                 size = (cast(int)decoder.getBuffer().get()) & 0xff;
122                 count = (cast(int)decoder.getBuffer().get()) & 0xff;
123                 break;
124             case EncodingCodes.LIST32:
125                 size = decoder.getBuffer().getInt();
126                 count = decoder.getBuffer().getInt();
127                 break;
128             default:
129             {
130                 logError("Incorrect type found in Disposition encoding: %d",typeCode);
131                 return null;
132             }
133         }
134 
135         Disposition disposition = new Disposition();
136 
137         for (int index = 0; index < count; ++index) {
138             switch (index) {
139                 case 0:
140                     disposition.setRole(Boolean.TRUE == (decoder.readBoolean()) ? Role.RECEIVER : Role.SENDER);
141                     break;
142                 case 1:
143                     disposition.setFirst(decoder.readUnsignedInteger(null));
144                     break;
145                 case 2:
146                     disposition.setLast(decoder.readUnsignedInteger(null));
147                     break;
148                 case 3:
149                     disposition.setSettled(decoder.readBoolean(Boolean.FALSE()));
150                     break;
151                 case 4:
152                     disposition.setState(cast(DeliveryState) decoder.readObject());
153                     break;
154                 case 5:
155                     disposition.setBatchable(decoder.readBoolean(Boolean.FALSE()));
156                     break;
157                 default:
158                     throw new IllegalStateException("To many entries in Disposition encoding");
159             }
160         }
161 
162         return disposition;
163     }
164 
165     override
166     public void skipValue() {
167         getDecoder().readConstructor().skipValue();
168     }
169 
170     override
171     public void write(Object v) {
172 
173         Disposition disposition = cast(Disposition)v;
174 
175         WritableBuffer buffer = getEncoder().getBuffer();
176         int count = getElementCount(disposition);
177         byte encodingCode = deduceEncodingCode(disposition, count);
178 
179         buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
180         buffer.put(EncodingCodes.SMALLULONG);
181         buffer.put(DESCRIPTOR_CODE);
182         buffer.put(encodingCode);
183 
184         int fieldWidth;
185 
186         if (encodingCode == EncodingCodes.LIST8) {
187             fieldWidth = 1;
188         } else {
189             fieldWidth = 4;
190         }
191 
192         int startIndex = buffer.position();
193 
194         // Reserve space for the size and write the count of list elements.
195         if (fieldWidth == 1) {
196             buffer.put(cast(byte) 0);
197             buffer.put(cast(byte) count);
198         } else {
199             buffer.putInt(0);
200             buffer.putInt(count);
201         }
202 
203         // Write the list elements and then compute total size written.
204         for (int i = 0; i < count; ++i) {
205             writeElement(disposition, i);
206         }
207 
208         // Move back and write the size
209         int endIndex = buffer.position();
210         int writeSize = endIndex - startIndex - fieldWidth;
211 
212         buffer.position(startIndex);
213         if (fieldWidth == 1) {
214             buffer.put(cast(byte) writeSize);
215         } else {
216             buffer.putInt(writeSize);
217         }
218         buffer.position(endIndex);
219     }
220 
221     private void writeElement(Disposition disposition, int index) {
222         switch (index) {
223             case 0:
224                 getEncoder().writeBoolean(disposition.getRole().getValue());
225                 break;
226             case 1:
227                 getEncoder().writeUnsignedInteger(disposition.getFirst());
228                 break;
229             case 2:
230                 getEncoder().writeUnsignedInteger(disposition.getLast());
231                 break;
232             case 3:
233                 getEncoder().writeBoolean(disposition.getSettled());
234                 break;
235             case 4:
236                 if (Accepted.getInstance().getType == disposition.getState().getType) {
237                     getEncoder().getBuffer().put(ACCEPTED_ENCODED_BYTES, 0, cast(int)ACCEPTED_ENCODED_BYTES.length);
238                 } else {
239                    getEncoder().writeObject(cast(Object)disposition.getState());
240                 }
241                 break;
242             case 5:
243                 getEncoder().writeBoolean(disposition.getBatchable());
244                 break;
245             default:
246                 throw new IllegalArgumentException("Unknown Disposition value index: " ~ to!string(index));
247         }
248     }
249 
250     private int getElementCount(Disposition disposition) {
251         if (disposition.getBatchable().booleanValue()) {
252             return 6;
253         } else if (disposition.getState() !is null) {
254             return 5;
255         } else if (disposition.getSettled().booleanValue) {
256             return 4;
257         } else if (disposition.getLast() !is null) {
258             return 3;
259         } else {
260             return 2;
261         }
262     }
263 
264     private byte deduceEncodingCode(Disposition value, int elementCount) {
265         if (value.getState() is null) {
266             return EncodingCodes.LIST8;
267         } else if (value.getState().getType() == Accepted.getInstance().getType() || value.getState().getType() == Released.getInstance().getType()) { //TODO
268             return EncodingCodes.LIST8;
269         } else {
270             return EncodingCodes.LIST32;
271         }
272     }
273 
274     public static void register(Decoder decoder, EncoderImpl encoder) {
275         FastPathDispositionType type = new FastPathDispositionType(encoder);
276         foreach(Object descriptor ; DESCRIPTORS) {
277             decoder.registerFastPath(descriptor,  type);
278         }
279         encoder.register(type);
280     }
281 }