1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.proton.codec.transport.FastPathTransferType; 12 13 import hunt.collection.Collection; 14 15 import hunt.proton.amqp.Symbol; 16 import hunt.proton.amqp.UnsignedByte; 17 import hunt.proton.amqp.UnsignedLong; 18 import hunt.proton.amqp.transport.DeliveryState; 19 import hunt.proton.amqp.transport.ReceiverSettleMode; 20 import hunt.proton.amqp.transport.Transfer; 21 import hunt.proton.codec.AMQPType; 22 import hunt.proton.codec.DecodeException; 23 import hunt.proton.codec.Decoder; 24 import hunt.proton.codec.DecoderImpl; 25 import hunt.proton.codec.EncoderImpl; 26 import hunt.proton.codec.EncodingCodes; 27 import hunt.proton.codec.FastPathDescribedTypeConstructor; 28 import hunt.proton.codec.TypeEncoding; 29 import hunt.proton.codec.WritableBuffer; 30 import std.concurrency : initOnce; 31 import hunt.logging; 32 import hunt.proton.codec.transport.TransferType; 33 import hunt.Exceptions; 34 import hunt.Boolean; 35 import std.conv : to; 36 /** 37 * Fast TrasnferType encoder 38 */ 39 class FastPathTransferType : AMQPType!(Transfer), FastPathDescribedTypeConstructor!(Transfer) { 40 41 private static byte DESCRIPTOR_CODE = 0x14; 42 43 //private static Object[] DESCRIPTORS = 44 //{ 45 // UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:transfer:list"), 46 //}; 47 48 static Object[] DESCRIPTORS() { 49 __gshared Object[] inst; 50 return initOnce!inst([UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:transfer:list")]); 51 } 52 53 private TransferType transferType; 54 55 this(EncoderImpl encoder) { 56 this.transferType = new TransferType(encoder); 57 } 58 59 public EncoderImpl getEncoder() { 60 return transferType.getEncoder(); 61 } 62 63 public DecoderImpl getDecoder() { 64 return transferType.getDecoder(); 65 } 66 67 override 68 public Object readValue() { 69 DecoderImpl decoder = getDecoder(); 70 byte typeCode = decoder.getBuffer().get(); 71 72 int size = 0; 73 int count = 0; 74 75 switch (typeCode) { 76 case EncodingCodes.LIST0: 77 // TODO - Technically invalid however old decoder also allowed this. 78 break; 79 case EncodingCodes.LIST8: 80 size = (cast(int)decoder.getBuffer().get()) & 0xff; 81 count = (cast(int)decoder.getBuffer().get()) & 0xff; 82 break; 83 case EncodingCodes.LIST32: 84 size = decoder.getBuffer().getInt(); 85 count = decoder.getBuffer().getInt(); 86 break; 87 default: 88 { 89 logError("Incorrect type found in Transfer encoding: %d",typeCode); 90 return null; 91 } 92 // throw new DecodeException("Incorrect type found in Transfer encoding: " ~ typeCode); 93 } 94 95 Transfer transfer = new Transfer(); 96 97 for (int index = 0; index < count; ++index) { 98 switch (index) { 99 case 0: 100 transfer.setHandle(decoder.readUnsignedInteger(null)); 101 break; 102 case 1: 103 transfer.setDeliveryId(decoder.readUnsignedInteger(null)); 104 break; 105 case 2: 106 transfer.setDeliveryTag(decoder.readBinary(null)); 107 break; 108 case 3: 109 transfer.setMessageFormat(decoder.readUnsignedInteger(null)); 110 break; 111 case 4: 112 transfer.setSettled(decoder.readBoolean(null)); 113 break; 114 case 5: 115 transfer.setMore(decoder.readBoolean(Boolean.FALSE)); 116 break; 117 case 6: 118 UnsignedByte rcvSettleMode = decoder.readUnsignedByte(); 119 transfer.setRcvSettleMode(rcvSettleMode is null ? null : ReceiverSettleMode.valueOf(rcvSettleMode)); 120 break; 121 case 7: 122 transfer.setState(cast(DeliveryState) decoder.readObject()); 123 break; 124 case 8: 125 transfer.setResume(decoder.readBoolean(Boolean.FALSE)); 126 break; 127 case 9: 128 transfer.setAborted(decoder.readBoolean(Boolean.FALSE)); 129 break; 130 case 10: 131 transfer.setBatchable(decoder.readBoolean(Boolean.FALSE)); 132 break; 133 default: 134 throw new IllegalStateException("To many entries in Transfer encoding"); 135 } 136 } 137 138 return transfer; 139 } 140 141 override 142 public void skipValue() { 143 getDecoder().readConstructor().skipValue(); 144 } 145 146 override 147 public bool encodesJavaPrimitive() { 148 return false; 149 } 150 151 override 152 public TypeInfo getTypeClass() { 153 return typeid(Transfer); 154 } 155 156 override 157 public ITypeEncoding getEncoding(Object transfer) { 158 return transferType.getEncoding(cast(Transfer)transfer); 159 } 160 161 override 162 public TypeEncoding!(Transfer) getCanonicalEncoding() { 163 return transferType.getCanonicalEncoding(); 164 } 165 166 override 167 public Collection!(TypeEncoding!(Transfer))getAllEncodings() { 168 return transferType.getAllEncodings(); 169 } 170 171 override 172 public void write(Object v) { 173 174 Transfer value = cast(Transfer)v; 175 176 WritableBuffer buffer = getEncoder().getBuffer(); 177 int count = getElementCount(value); 178 byte encodingCode = deduceEncodingCode(value, count); 179 180 buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR); 181 buffer.put(EncodingCodes.SMALLULONG); 182 buffer.put(DESCRIPTOR_CODE); 183 buffer.put(encodingCode); 184 185 int fieldWidth; 186 187 if (encodingCode == EncodingCodes.LIST8) { 188 fieldWidth = 1; 189 } else { 190 fieldWidth = 4; 191 } 192 193 int startIndex = buffer.position(); 194 195 // Reserve space for the size and write the count of list elements. 196 if (fieldWidth == 1) { 197 buffer.put(cast(byte) 0); 198 buffer.put(cast(byte) count); 199 } else { 200 buffer.putInt(0); 201 buffer.putInt(count); 202 } 203 204 // Write the list elements and then compute total size written. 205 for (int i = 0; i < count; ++i) { 206 writeElement(value, i); 207 } 208 209 // Move back and write the size 210 int endIndex = buffer.position(); 211 int writeSize = endIndex - startIndex - fieldWidth; 212 213 buffer.position(startIndex); 214 if (fieldWidth == 1) { 215 buffer.put(cast(byte) writeSize); 216 } else { 217 buffer.putInt(writeSize); 218 } 219 buffer.position(endIndex); 220 } 221 222 private void writeElement(Transfer transfer, int index) { 223 switch (index) { 224 case 0: 225 getEncoder().writeUnsignedInteger(transfer.getHandle()); 226 break; 227 case 1: 228 getEncoder().writeUnsignedInteger(transfer.getDeliveryId()); 229 break; 230 case 2: 231 getEncoder().writeBinary(transfer.getDeliveryTag()); 232 break; 233 case 3: 234 getEncoder().writeUnsignedInteger(transfer.getMessageFormat()); 235 break; 236 case 4: 237 getEncoder().writeBoolean(transfer.getSettled()); 238 break; 239 case 5: 240 getEncoder().writeBoolean(transfer.getMore()); 241 break; 242 case 6: 243 ReceiverSettleMode rcvSettleMode = transfer.getRcvSettleMode(); 244 getEncoder().writeObject(rcvSettleMode is null ? null : rcvSettleMode.getValue()); 245 break; 246 case 7: 247 getEncoder().writeObject(cast(Object)(transfer.getState())); 248 // implementationMissing(false); 249 break; 250 case 8: 251 getEncoder().writeBoolean(transfer.getResume()); 252 break; 253 case 9: 254 getEncoder().writeBoolean(transfer.getAborted()); 255 break; 256 case 10: 257 getEncoder().writeBoolean(transfer.getBatchable()); 258 break; 259 default: 260 throw new IllegalArgumentException("Unknown Transfer value index: " ~ to!string(index)); 261 } 262 } 263 264 private byte deduceEncodingCode(Transfer value, int elementCount) { 265 if (value.getState() !is null) { 266 return EncodingCodes.LIST32; 267 } else if (value.getDeliveryTag() !is null && value.getDeliveryTag().getLength() > 200) { 268 return EncodingCodes.LIST32; 269 } else { 270 return EncodingCodes.LIST8; 271 } 272 } 273 274 private int getElementCount(Transfer transfer) { 275 if (transfer.getBatchable().booleanValue) { 276 return 11; 277 } else if (transfer.getAborted().booleanValue) { 278 return 10; 279 } else if (transfer.getResume().booleanValue) { 280 return 9; 281 } else if (transfer.getState() !is null) { 282 return 8; 283 } else if (transfer.getRcvSettleMode() !is null) { 284 return 7; 285 } else if (transfer.getMore().booleanValue) { 286 return 6; 287 } else if (transfer.getSettled() !is null) { 288 return 5; 289 } else if (transfer.getMessageFormat() !is null) { 290 return 4; 291 } else if (transfer.getDeliveryTag() !is null) { 292 return 3; 293 } else if (transfer.getDeliveryId() !is null) { 294 return 2; 295 } else { 296 return 1; 297 } 298 } 299 300 public static void register(Decoder decoder, EncoderImpl encoder) { 301 FastPathTransferType type = new FastPathTransferType(encoder); 302 foreach(Object descriptor ; DESCRIPTORS) 303 { 304 decoder.registerFastPath(descriptor, type); 305 } 306 encoder.register(type); 307 } 308 }