1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.proton.codec.transport.FastPathTransferType;
12 
13 import hunt.collection.Collection;
14 
15 import hunt.proton.amqp.Symbol;
16 import hunt.proton.amqp.UnsignedByte;
17 import hunt.proton.amqp.UnsignedLong;
18 import hunt.proton.amqp.transport.DeliveryState;
19 import hunt.proton.amqp.transport.ReceiverSettleMode;
20 import hunt.proton.amqp.transport.Transfer;
21 import hunt.proton.codec.AMQPType;
22 import hunt.proton.codec.DecodeException;
23 import hunt.proton.codec.Decoder;
24 import hunt.proton.codec.DecoderImpl;
25 import hunt.proton.codec.EncoderImpl;
26 import hunt.proton.codec.EncodingCodes;
27 import hunt.proton.codec.FastPathDescribedTypeConstructor;
28 import hunt.proton.codec.TypeEncoding;
29 import hunt.proton.codec.WritableBuffer;
30 import std.concurrency : initOnce;
31 import hunt.logging;
32 import hunt.proton.codec.transport.TransferType;
33 import hunt.Exceptions;
34 import hunt.Boolean;
35 import std.conv : to;
36 /**
37  * Fast TrasnferType encoder
38  */
39 class FastPathTransferType : AMQPType!(Transfer), FastPathDescribedTypeConstructor!(Transfer) {
40 
41     private static byte DESCRIPTOR_CODE = 0x14;
42 
43     //private static Object[] DESCRIPTORS =
44     //{
45     //    UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:transfer:list"),
46     //};
47 
48      static Object[]  DESCRIPTORS() {
49          __gshared Object[]  inst;
50          return initOnce!inst([UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:transfer:list")]);
51      }
52 
53     private TransferType transferType;
54 
55     this(EncoderImpl encoder) {
56         this.transferType = new TransferType(encoder);
57     }
58 
59     public EncoderImpl getEncoder() {
60         return transferType.getEncoder();
61     }
62 
63     public DecoderImpl getDecoder() {
64         return transferType.getDecoder();
65     }
66 
67     override
68     public Object readValue() {
69         DecoderImpl decoder = getDecoder();
70         byte typeCode = decoder.getBuffer().get();
71 
72         int size = 0;
73         int count = 0;
74 
75         switch (typeCode) {
76             case EncodingCodes.LIST0:
77                 // TODO - Technically invalid however old decoder also allowed this.
78                 break;
79             case EncodingCodes.LIST8:
80                 size = (cast(int)decoder.getBuffer().get()) & 0xff;
81                 count = (cast(int)decoder.getBuffer().get()) & 0xff;
82                 break;
83             case EncodingCodes.LIST32:
84                 size = decoder.getBuffer().getInt();
85                 count = decoder.getBuffer().getInt();
86                 break;
87             default:
88             {
89                 logError("Incorrect type found in Transfer encoding: %d",typeCode);
90                 return null;
91             }
92               //  throw new DecodeException("Incorrect type found in Transfer encoding: " ~ typeCode);
93         }
94 
95         Transfer transfer = new Transfer();
96 
97         for (int index = 0; index < count; ++index) {
98             switch (index) {
99                 case 0:
100                     transfer.setHandle(decoder.readUnsignedInteger(null));
101                     break;
102                 case 1:
103                     transfer.setDeliveryId(decoder.readUnsignedInteger(null));
104                     break;
105                 case 2:
106                     transfer.setDeliveryTag(decoder.readBinary(null));
107                     break;
108                 case 3:
109                     transfer.setMessageFormat(decoder.readUnsignedInteger(null));
110                     break;
111                 case 4:
112                     transfer.setSettled(decoder.readBoolean(null));
113                     break;
114                 case 5:
115                     transfer.setMore(decoder.readBoolean(Boolean.FALSE));
116                     break;
117                 case 6:
118                     UnsignedByte rcvSettleMode = decoder.readUnsignedByte();
119                     transfer.setRcvSettleMode(rcvSettleMode is null ? null : ReceiverSettleMode.valueOf(rcvSettleMode));
120                     break;
121                 case 7:
122                     transfer.setState(cast(DeliveryState) decoder.readObject());
123                     break;
124                 case 8:
125                     transfer.setResume(decoder.readBoolean(Boolean.FALSE));
126                     break;
127                 case 9:
128                     transfer.setAborted(decoder.readBoolean(Boolean.FALSE));
129                     break;
130                 case 10:
131                     transfer.setBatchable(decoder.readBoolean(Boolean.FALSE));
132                     break;
133                 default:
134                     throw new IllegalStateException("To many entries in Transfer encoding");
135             }
136         }
137 
138         return transfer;
139     }
140 
141     override
142     public void skipValue() {
143         getDecoder().readConstructor().skipValue();
144     }
145 
146     override
147     public bool encodesJavaPrimitive() {
148         return false;
149     }
150 
151     override
152     public TypeInfo getTypeClass() {
153         return typeid(Transfer);
154     }
155 
156     override
157     public ITypeEncoding getEncoding(Object transfer) {
158         return transferType.getEncoding(cast(Transfer)transfer);
159     }
160 
161     override
162     public TypeEncoding!(Transfer) getCanonicalEncoding() {
163         return transferType.getCanonicalEncoding();
164     }
165 
166     override
167     public Collection!(TypeEncoding!(Transfer))getAllEncodings() {
168         return transferType.getAllEncodings();
169     }
170 
171     override
172     public void write(Object v) {
173 
174         Transfer value = cast(Transfer)v;
175 
176         WritableBuffer buffer = getEncoder().getBuffer();
177         int count = getElementCount(value);
178         byte encodingCode = deduceEncodingCode(value, count);
179 
180         buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
181         buffer.put(EncodingCodes.SMALLULONG);
182         buffer.put(DESCRIPTOR_CODE);
183         buffer.put(encodingCode);
184 
185         int fieldWidth;
186 
187         if (encodingCode == EncodingCodes.LIST8) {
188             fieldWidth = 1;
189         } else {
190             fieldWidth = 4;
191         }
192 
193         int startIndex = buffer.position();
194 
195         // Reserve space for the size and write the count of list elements.
196         if (fieldWidth == 1) {
197             buffer.put(cast(byte) 0);
198             buffer.put(cast(byte) count);
199         } else {
200             buffer.putInt(0);
201             buffer.putInt(count);
202         }
203 
204         // Write the list elements and then compute total size written.
205         for (int i = 0; i < count; ++i) {
206             writeElement(value, i);
207         }
208 
209         // Move back and write the size
210         int endIndex = buffer.position();
211         int writeSize = endIndex - startIndex - fieldWidth;
212 
213         buffer.position(startIndex);
214         if (fieldWidth == 1) {
215             buffer.put(cast(byte) writeSize);
216         } else {
217             buffer.putInt(writeSize);
218         }
219         buffer.position(endIndex);
220     }
221 
222     private void writeElement(Transfer transfer, int index) {
223         switch (index) {
224             case 0:
225                 getEncoder().writeUnsignedInteger(transfer.getHandle());
226                 break;
227             case 1:
228                 getEncoder().writeUnsignedInteger(transfer.getDeliveryId());
229                 break;
230             case 2:
231                 getEncoder().writeBinary(transfer.getDeliveryTag());
232                 break;
233             case 3:
234                 getEncoder().writeUnsignedInteger(transfer.getMessageFormat());
235                 break;
236             case 4:
237                 getEncoder().writeBoolean(transfer.getSettled());
238                 break;
239             case 5:
240                 getEncoder().writeBoolean(transfer.getMore());
241                 break;
242             case 6:
243                 ReceiverSettleMode rcvSettleMode = transfer.getRcvSettleMode();
244                 getEncoder().writeObject(rcvSettleMode is null ? null : rcvSettleMode.getValue());
245                 break;
246             case 7:
247                 getEncoder().writeObject(cast(Object)(transfer.getState()));
248                // implementationMissing(false);
249                 break;
250             case 8:
251                 getEncoder().writeBoolean(transfer.getResume());
252                 break;
253             case 9:
254                 getEncoder().writeBoolean(transfer.getAborted());
255                 break;
256             case 10:
257                 getEncoder().writeBoolean(transfer.getBatchable());
258                 break;
259             default:
260                 throw new IllegalArgumentException("Unknown Transfer value index: " ~ to!string(index));
261         }
262     }
263 
264     private byte deduceEncodingCode(Transfer value, int elementCount) {
265         if (value.getState() !is null) {
266             return EncodingCodes.LIST32;
267         } else if (value.getDeliveryTag() !is null && value.getDeliveryTag().getLength() > 200) {
268             return EncodingCodes.LIST32;
269         } else {
270             return EncodingCodes.LIST8;
271         }
272     }
273 
274     private int getElementCount(Transfer transfer) {
275         if (transfer.getBatchable().booleanValue) {
276             return 11;
277         } else if (transfer.getAborted().booleanValue) {
278             return 10;
279         } else if (transfer.getResume().booleanValue) {
280             return 9;
281         } else if (transfer.getState() !is null) {
282             return 8;
283         } else if (transfer.getRcvSettleMode() !is null) {
284             return 7;
285         } else if (transfer.getMore().booleanValue) {
286             return 6;
287         } else if (transfer.getSettled() !is null) {
288             return 5;
289         } else if (transfer.getMessageFormat() !is null) {
290             return 4;
291         } else if (transfer.getDeliveryTag() !is null) {
292             return 3;
293         } else if (transfer.getDeliveryId() !is null) {
294             return 2;
295         } else {
296             return 1;
297         }
298     }
299 
300     public static void register(Decoder decoder, EncoderImpl encoder) {
301         FastPathTransferType type = new FastPathTransferType(encoder);
302         foreach(Object descriptor ; DESCRIPTORS)
303         {
304             decoder.registerFastPath(descriptor,  type);
305         }
306         encoder.register(type);
307     }
308 }