1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.proton.codec.transport.FastPathDispositionType; 12 13 import hunt.collection.Collection; 14 15 import hunt.proton.amqp.Symbol; 16 import hunt.proton.amqp.UnsignedLong; 17 import hunt.proton.amqp.messaging.Accepted; 18 import hunt.proton.amqp.messaging.Released; 19 import hunt.proton.amqp.transport.DeliveryState; 20 import hunt.proton.amqp.transport.Disposition; 21 import hunt.proton.amqp.transport.Role; 22 import hunt.proton.codec.AMQPType; 23 import hunt.proton.codec.DecodeException; 24 import hunt.proton.codec.Decoder; 25 import hunt.proton.codec.DecoderImpl; 26 import hunt.proton.codec.EncoderImpl; 27 import hunt.proton.codec.EncodingCodes; 28 import hunt.proton.codec.FastPathDescribedTypeConstructor; 29 import hunt.proton.codec.TypeEncoding; 30 import hunt.proton.codec.WritableBuffer; 31 import std.concurrency : initOnce; 32 import hunt.logging; 33 import hunt.proton.codec.transport.DispositionType; 34 import hunt.Boolean; 35 import hunt.Exceptions; 36 import std.conv : to; 37 38 class FastPathDispositionType : AMQPType!(Disposition), FastPathDescribedTypeConstructor!(Disposition) { 39 40 private static byte DESCRIPTOR_CODE = 0x15; 41 private static byte ACCEPTED_DESCRIPTOR_CODE = 0x24; 42 43 //private static Object[] DESCRIPTORS = 44 //{ 45 // UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:disposition:list"), 46 //}; 47 48 static Object[] DESCRIPTORS() { 49 __gshared Object[] inst; 50 return initOnce!inst([UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:disposition:list")]); 51 } 52 53 static byte[] ACCEPTED_ENCODED_BYTES() { 54 __gshared byte[] inst; 55 return initOnce!inst([ EncodingCodes.DESCRIBED_TYPE_INDICATOR, 56 EncodingCodes.SMALLULONG, 57 ACCEPTED_DESCRIPTOR_CODE, 58 EncodingCodes.LIST0]); 59 } 60 61 62 //private static byte[] ACCEPTED_ENCODED_BYTES = [ 63 // EncodingCodes.DESCRIBED_TYPE_INDICATOR, 64 // EncodingCodes.SMALLULONG, 65 // ACCEPTED_DESCRIPTOR_CODE, 66 // EncodingCodes.LIST0 67 //]; 68 69 private DispositionType dispositionType; 70 71 this(EncoderImpl encoder) { 72 this.dispositionType = new DispositionType(encoder); 73 } 74 75 public EncoderImpl getEncoder() { 76 return dispositionType.getEncoder(); 77 } 78 79 public DecoderImpl getDecoder() { 80 return dispositionType.getDecoder(); 81 } 82 83 override 84 public bool encodesJavaPrimitive() { 85 return false; 86 } 87 88 override 89 public TypeInfo getTypeClass() { 90 return typeid(Disposition); 91 } 92 93 override 94 public ITypeEncoding getEncoding(Object disposition) { 95 return dispositionType.getEncoding(cast(Disposition)disposition); 96 } 97 98 override 99 public TypeEncoding!(Disposition) getCanonicalEncoding() { 100 return dispositionType.getCanonicalEncoding(); 101 } 102 103 override 104 public Collection!(TypeEncoding!(Disposition)) getAllEncodings() { 105 return dispositionType.getAllEncodings(); 106 } 107 108 override 109 public Disposition readValue() { 110 DecoderImpl decoder = getDecoder(); 111 byte typeCode = decoder.getBuffer().get(); 112 113 int size = 0; 114 int count = 0; 115 116 switch (typeCode) { 117 case EncodingCodes.LIST0: 118 // TODO - Technically invalid however old decoder also allowed this. 119 break; 120 case EncodingCodes.LIST8: 121 size = (cast(int)decoder.getBuffer().get()) & 0xff; 122 count = (cast(int)decoder.getBuffer().get()) & 0xff; 123 break; 124 case EncodingCodes.LIST32: 125 size = decoder.getBuffer().getInt(); 126 count = decoder.getBuffer().getInt(); 127 break; 128 default: 129 { 130 logError("Incorrect type found in Disposition encoding: %d",typeCode); 131 return null; 132 } 133 } 134 135 Disposition disposition = new Disposition(); 136 137 for (int index = 0; index < count; ++index) { 138 switch (index) { 139 case 0: 140 disposition.setRole(Boolean.TRUE == (decoder.readBoolean()) ? Role.RECEIVER : Role.SENDER); 141 break; 142 case 1: 143 disposition.setFirst(decoder.readUnsignedInteger(null)); 144 break; 145 case 2: 146 disposition.setLast(decoder.readUnsignedInteger(null)); 147 break; 148 case 3: 149 disposition.setSettled(decoder.readBoolean(Boolean.FALSE())); 150 break; 151 case 4: 152 disposition.setState(cast(DeliveryState) decoder.readObject()); 153 break; 154 case 5: 155 disposition.setBatchable(decoder.readBoolean(Boolean.FALSE())); 156 break; 157 default: 158 throw new IllegalStateException("To many entries in Disposition encoding"); 159 } 160 } 161 162 return disposition; 163 } 164 165 override 166 public void skipValue() { 167 getDecoder().readConstructor().skipValue(); 168 } 169 170 override 171 public void write(Object v) { 172 173 Disposition disposition = cast(Disposition)v; 174 175 WritableBuffer buffer = getEncoder().getBuffer(); 176 int count = getElementCount(disposition); 177 byte encodingCode = deduceEncodingCode(disposition, count); 178 179 buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR); 180 buffer.put(EncodingCodes.SMALLULONG); 181 buffer.put(DESCRIPTOR_CODE); 182 buffer.put(encodingCode); 183 184 int fieldWidth; 185 186 if (encodingCode == EncodingCodes.LIST8) { 187 fieldWidth = 1; 188 } else { 189 fieldWidth = 4; 190 } 191 192 int startIndex = buffer.position(); 193 194 // Reserve space for the size and write the count of list elements. 195 if (fieldWidth == 1) { 196 buffer.put(cast(byte) 0); 197 buffer.put(cast(byte) count); 198 } else { 199 buffer.putInt(0); 200 buffer.putInt(count); 201 } 202 203 // Write the list elements and then compute total size written. 204 for (int i = 0; i < count; ++i) { 205 writeElement(disposition, i); 206 } 207 208 // Move back and write the size 209 int endIndex = buffer.position(); 210 int writeSize = endIndex - startIndex - fieldWidth; 211 212 buffer.position(startIndex); 213 if (fieldWidth == 1) { 214 buffer.put(cast(byte) writeSize); 215 } else { 216 buffer.putInt(writeSize); 217 } 218 buffer.position(endIndex); 219 } 220 221 private void writeElement(Disposition disposition, int index) { 222 switch (index) { 223 case 0: 224 getEncoder().writeBoolean(disposition.getRole().getValue()); 225 break; 226 case 1: 227 getEncoder().writeUnsignedInteger(disposition.getFirst()); 228 break; 229 case 2: 230 getEncoder().writeUnsignedInteger(disposition.getLast()); 231 break; 232 case 3: 233 getEncoder().writeBoolean(disposition.getSettled()); 234 break; 235 case 4: 236 if (Accepted.getInstance().getType == disposition.getState().getType) { 237 getEncoder().getBuffer().put(ACCEPTED_ENCODED_BYTES, 0, cast(int)ACCEPTED_ENCODED_BYTES.length); 238 } else { 239 getEncoder().writeObject(cast(Object)disposition.getState()); 240 } 241 break; 242 case 5: 243 getEncoder().writeBoolean(disposition.getBatchable()); 244 break; 245 default: 246 throw new IllegalArgumentException("Unknown Disposition value index: " ~ to!string(index)); 247 } 248 } 249 250 private int getElementCount(Disposition disposition) { 251 if (disposition.getBatchable().booleanValue()) { 252 return 6; 253 } else if (disposition.getState() !is null) { 254 return 5; 255 } else if (disposition.getSettled().booleanValue) { 256 return 4; 257 } else if (disposition.getLast() !is null) { 258 return 3; 259 } else { 260 return 2; 261 } 262 } 263 264 private byte deduceEncodingCode(Disposition value, int elementCount) { 265 if (value.getState() is null) { 266 return EncodingCodes.LIST8; 267 } else if (value.getState().getType() == Accepted.getInstance().getType() || value.getState().getType() == Released.getInstance().getType()) { //TODO 268 return EncodingCodes.LIST8; 269 } else { 270 return EncodingCodes.LIST32; 271 } 272 } 273 274 public static void register(Decoder decoder, EncoderImpl encoder) { 275 FastPathDispositionType type = new FastPathDispositionType(encoder); 276 foreach(Object descriptor ; DESCRIPTORS) { 277 decoder.registerFastPath(descriptor, type); 278 } 279 encoder.register(type); 280 } 281 }