1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.FrameParser; 13 14 import hunt.proton.engine.impl.AmqpHeader; 15 import hunt.proton.engine.impl.ByteBufferUtils; 16 17 import hunt.io.ByteBuffer; 18 import hunt.io.BufferUtils; 19 import hunt.proton.amqp.Binary; 20 import hunt.proton.amqp.transport.EmptyFrame; 21 import hunt.proton.amqp.transport.FrameBody; 22 import hunt.proton.codec.ByteBufferDecoder; 23 import hunt.proton.codec.DecodeException; 24 import hunt.proton.engine.Transport; 25 import hunt.proton.engine.TransportException; 26 import hunt.proton.framing.TransportFrame; 27 import hunt.proton.engine.impl.TransportInput; 28 import hunt.proton.engine.impl.FrameHandler; 29 import hunt.proton.engine.impl.TransportImpl; 30 import std.concurrency : initOnce; 31 import hunt.proton.engine.impl.ProtocolTracer; 32 import hunt.logging; 33 import hunt.String; 34 35 class FrameParser : TransportInput 36 { 37 private static string HEADER_DESCRIPTION = "AMQP"; 38 39 //private static ByteBuffer _emptyInputBuffer = newWriteableBuffer(0); 40 41 42 static ByteBuffer _emptyInputBuffer() { 43 __gshared ByteBuffer inst; 44 return initOnce!inst(ByteBufferUtils.newWriteableBuffer(0)); 45 } 46 47 enum State 48 { 49 HEADER0, 50 HEADER1, 51 HEADER2, 52 HEADER3, 53 HEADER4, 54 HEADER5, 55 HEADER6, 56 HEADER7, 57 SIZE_0, 58 SIZE_1, 59 SIZE_2, 60 SIZE_3, 61 PRE_PARSE, 62 BUFFERING, 63 PARSING, 64 ERROR 65 } 66 67 private FrameHandler _frameHandler; 68 private ByteBufferDecoder _decoder; 69 private int _inputBufferSize; 70 private int _localMaxFrameSize; 71 private TransportImpl _transport; 72 73 private ByteBuffer _inputBuffer = null; 74 private bool _tail_closed = false; 75 76 private State _state = State.HEADER0; 77 78 private long _framesInput = 0; 79 80 /** the stated size of the current frame */ 81 private int _size; 82 83 /** holds the current frame that is being parsed */ 84 private ByteBuffer _frameBuffer; 85 86 private TransportFrame _heldFrame; 87 private TransportException _parsingError; 88 89 90 /** 91 * We store the last result when processing input so that 92 * we know not to process any more input if it was an error. 93 */ 94 this(FrameHandler frameHandler, ByteBufferDecoder decoder, int localMaxFrameSize, TransportImpl transport) 95 { 96 _frameHandler = frameHandler; 97 _decoder = decoder; 98 _localMaxFrameSize = localMaxFrameSize; 99 _inputBufferSize = _localMaxFrameSize > 0 ? _localMaxFrameSize : 16*1024; 100 _transport = transport; 101 } 102 103 private void input(ByteBuffer inbuf) 104 { 105 //logError("inbuf : %s",inbuf.array()); 106 // ByteBuffer tmp = inbuf; 107 // logError("inbuf : %s",inbuf.getRemaining()); 108 flushHeldFrame(); 109 if (_heldFrame !is null) 110 { 111 return; 112 } 113 114 TransportException frameParsingError = null; 115 int size = _size; 116 State state = _state; 117 ByteBuffer oldIn = null; 118 119 bool transportAccepting = true; 120 121 while(inbuf.hasRemaining() && state != State.ERROR && transportAccepting) 122 { 123 switch(state) 124 { 125 case State.HEADER0: 126 if(inbuf.hasRemaining()) 127 { 128 byte c = inbuf.get(); 129 if(c != AmqpHeader.HEADER[0]) 130 { 131 frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s"); 132 state = State.ERROR; 133 break; 134 } 135 state = State.HEADER1; 136 goto case; 137 } 138 else 139 { 140 break; 141 } 142 case State.HEADER1: 143 if(inbuf.hasRemaining()) 144 { 145 byte c = inbuf.get(); 146 if(c != AmqpHeader.HEADER[1]) 147 { 148 frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s"); 149 state = State.ERROR; 150 break; 151 } 152 state = State.HEADER2; 153 goto case; 154 } 155 else 156 { 157 break; 158 } 159 case State.HEADER2: 160 if(inbuf.hasRemaining()) 161 { 162 byte c = inbuf.get(); 163 if(c != AmqpHeader.HEADER[2]) 164 { 165 frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s"); 166 state = State.ERROR; 167 break; 168 } 169 state = State.HEADER3; 170 goto case; 171 } 172 else 173 { 174 break; 175 } 176 case State.HEADER3: 177 if(inbuf.hasRemaining()) 178 { 179 byte c = inbuf.get(); 180 if(c != AmqpHeader.HEADER[3]) 181 { 182 frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s"); 183 state = State.ERROR; 184 break; 185 } 186 state = State.HEADER4; 187 goto case; 188 } 189 else 190 { 191 break; 192 } 193 case State.HEADER4: 194 if(inbuf.hasRemaining()) 195 { 196 byte c = inbuf.get(); 197 if(c != AmqpHeader.HEADER[4]) 198 { 199 frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s"); 200 state = State.ERROR; 201 break; 202 } 203 state = State.HEADER5; 204 goto case; 205 } 206 else 207 { 208 break; 209 } 210 case State.HEADER5: 211 if(inbuf.hasRemaining()) 212 { 213 byte c = inbuf.get(); 214 if(c != AmqpHeader.HEADER[5]) 215 { 216 frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s"); 217 state = State.ERROR; 218 break; 219 } 220 state = State.HEADER6; 221 goto case; 222 } 223 else 224 { 225 break; 226 } 227 case State.HEADER6: 228 if(inbuf.hasRemaining()) 229 { 230 byte c = inbuf.get(); 231 if(c != AmqpHeader.HEADER[6]) 232 { 233 frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s"); 234 state = State.ERROR; 235 break; 236 } 237 state = State.HEADER7; 238 goto case; 239 } 240 else 241 { 242 break; 243 } 244 case State.HEADER7: 245 if(inbuf.hasRemaining()) 246 { 247 byte c = inbuf.get(); 248 if(c != AmqpHeader.HEADER[7]) 249 { 250 frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s"); 251 state = State.ERROR; 252 break; 253 } 254 255 logHeader(); 256 257 state = State.SIZE_0; 258 goto case; 259 } 260 else 261 { 262 break; 263 } 264 case State.SIZE_0: 265 if(!inbuf.hasRemaining()) 266 { 267 break; 268 } 269 if(inbuf.remaining() >= 4) 270 { 271 size = inbuf.getInt(); 272 state = State.PRE_PARSE; 273 break; 274 } 275 else 276 { 277 size = (inbuf.get() << 24) & 0xFF000000; 278 if(!inbuf.hasRemaining()) 279 { 280 state = State.SIZE_1; 281 break; 282 } 283 } 284 goto case; 285 case State.SIZE_1: 286 size |= (inbuf.get() << 16) & 0xFF0000; 287 if(!inbuf.hasRemaining()) 288 { 289 state = State.SIZE_2; 290 break; 291 } 292 goto case; 293 case State.SIZE_2: 294 size |= (inbuf.get() << 8) & 0xFF00; 295 if(!inbuf.hasRemaining()) 296 { 297 state = State.SIZE_3; 298 break; 299 } 300 goto case; 301 case State.SIZE_3: 302 size |= inbuf.get() & 0xFF; 303 state = State.PRE_PARSE; 304 goto case; 305 case State.PRE_PARSE: 306 if(size < 8) 307 { 308 frameParsingError = new TransportException("specified frame size %d smaller than minimum frame header "); 309 state = State.ERROR; 310 break; 311 } 312 313 if (_localMaxFrameSize > 0 && size > _localMaxFrameSize) 314 { 315 frameParsingError = new TransportException("specified frame size %d greater than maximum valid frame size %d"); 316 state = State.ERROR; 317 break; 318 } 319 320 if(inbuf.remaining() < size-4) 321 { 322 _frameBuffer = BufferUtils.allocate(size-4); 323 _frameBuffer.put(inbuf); 324 state = State.BUFFERING; 325 break; 326 } 327 goto case; 328 case State.BUFFERING: 329 if(_frameBuffer !is null) 330 { 331 if(inbuf.remaining() < _frameBuffer.remaining()) 332 { 333 _frameBuffer.put(inbuf); 334 break; 335 } 336 else 337 { 338 ByteBuffer dup = inbuf.duplicate(); 339 dup.limit(dup.position()+_frameBuffer.remaining()); 340 inbuf.position(inbuf.position()+_frameBuffer.remaining()); 341 _frameBuffer.put(dup); 342 oldIn = inbuf; 343 _frameBuffer.flip(); 344 inbuf = _frameBuffer; 345 state = State.PARSING; 346 } 347 } 348 goto case; 349 case State.PARSING: 350 351 int dataOffset = (inbuf.get() << 2) & 0x3FF; 352 353 if(dataOffset < 8) 354 { 355 frameParsingError = new TransportException("specified frame data offset %d smaller than minimum frame header size "); 356 state = State.ERROR; 357 break; 358 } 359 else if(dataOffset > size) 360 { 361 frameParsingError = new TransportException("specified frame data offset %d larger than the frame size "); 362 state = State.ERROR; 363 break; 364 } 365 366 // type 367 368 int type = inbuf.get() & 0xFF; 369 int channel = inbuf.getShort() & 0xFFFF; 370 371 if(type != 0) 372 { 373 frameParsingError = new TransportException("unknown frame type"); 374 state = State.ERROR; 375 break; 376 } 377 378 // note that this skips over the extended header if it's present 379 if(dataOffset!=8) 380 { 381 inbuf.position(inbuf.position()+dataOffset-8); 382 } 383 384 // oldIn null iff not working on duplicated buffer 385 int frameBodySize = size - dataOffset; 386 if(oldIn is null) 387 { 388 oldIn = inbuf; 389 inbuf = inbuf.duplicate(); 390 int endPos = inbuf.position() + frameBodySize; 391 inbuf.limit(endPos); 392 oldIn.position(endPos); 393 394 } 395 396 try 397 { 398 _framesInput += 1; 399 400 Binary payload = null; 401 Object val = null; 402 403 if (frameBodySize > 0) 404 { 405 _decoder.setByteBuffer(inbuf); 406 val = _decoder.readObject(); 407 _decoder.setByteBuffer(null); 408 409 if(inbuf.hasRemaining()) 410 { 411 byte[] payloadBytes = new byte[inbuf.remaining()]; 412 inbuf.get(payloadBytes); 413 payload = new Binary(payloadBytes); 414 } 415 else 416 { 417 payload = null; 418 } 419 } 420 else 421 { 422 val = EmptyFrame.INSTANCE; 423 } 424 425 FrameBody frameBody = cast(FrameBody) val; 426 if(frameBody !is null) 427 { 428 version(HUNT_AMQP_DEBUG) { 429 tracef("IN: CH[%d] : %s, %s", channel, frameBody, 430 payload is null ? "" : ", [" ~ payload.toString() ~ "]"); 431 } 432 TransportFrame frame = new TransportFrame(channel, frameBody, payload); 433 434 if(_frameHandler.isHandlingFrames()) 435 { 436 _tail_closed = _frameHandler.handleFrame(frame); 437 } 438 else 439 { 440 transportAccepting = false; 441 _heldFrame = frame; 442 } 443 } 444 else 445 { 446 logError("Frameparser encountered a null"); 447 //throw new TransportException("Frameparser encountered a " 448 // + (val is null? "null" : val.getClass()) 449 // + " which is not a " + FrameBody.class); 450 } 451 452 reset(); 453 inbuf = oldIn; 454 oldIn = null; 455 _frameBuffer = null; 456 state = State.SIZE_0; 457 } 458 catch (DecodeException ex) 459 { 460 state = State.ERROR; 461 frameParsingError = new TransportException(ex); 462 } 463 break; 464 case State.ERROR: 465 break; 466 // do nothing 467 default: 468 break; 469 } 470 471 } 472 473 if (_tail_closed) 474 { 475 if (inbuf.hasRemaining()) { 476 state = State.ERROR; 477 frameParsingError = new TransportException("framing error"); 478 } else if (state != State.SIZE_0) { 479 state = State.ERROR; 480 frameParsingError = new TransportException("connection aborted"); 481 } else { 482 _frameHandler.closed(null); 483 } 484 } 485 486 _state = state; 487 _size = size; 488 489 if(_state == State.ERROR) 490 { 491 _tail_closed = true; 492 if(frameParsingError !is null) 493 { 494 _parsingError = frameParsingError; 495 _frameHandler.closed(frameParsingError); 496 } 497 else 498 { 499 throw new TransportException("Unable to parse, probably because of a previous error"); 500 } 501 } 502 } 503 504 override 505 public int capacity() 506 { 507 if (_tail_closed) { 508 return Transport.END_OF_STREAM; 509 } else { 510 if (_inputBuffer !is null) { 511 return _inputBuffer.remaining(); 512 } else { 513 return _inputBufferSize; 514 } 515 } 516 } 517 518 override 519 public int position() { 520 if (_tail_closed) { 521 return Transport.END_OF_STREAM; 522 } 523 return (_inputBuffer is null) ? 0 : _inputBuffer.position(); 524 } 525 526 override 527 public ByteBuffer tail() 528 { 529 if (_tail_closed) { 530 throw new TransportException("tail closed"); 531 } 532 533 if (_inputBuffer is null) { 534 _inputBuffer = ByteBufferUtils.newWriteableBuffer(_inputBufferSize); 535 } 536 537 return _inputBuffer; 538 } 539 540 override 541 public void process() 542 { 543 if (_inputBuffer !is null) 544 { 545 _inputBuffer.flip(); 546 547 try 548 { 549 input(_inputBuffer); 550 } 551 finally 552 { 553 if (_inputBuffer.hasRemaining()) { 554 _inputBuffer.compact(); 555 } else if (_inputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) { 556 _inputBuffer = null; 557 } else { 558 _inputBuffer.clear(); 559 } 560 } 561 } 562 else 563 { 564 input(_emptyInputBuffer); 565 } 566 } 567 568 override 569 public void close_tail() 570 { 571 _tail_closed = true; 572 process(); 573 } 574 575 /** 576 * Attempt to flush any cached data to the frame transport. This function 577 * is useful if the {@link FrameHandler} state has changed. 578 */ 579 public void flush() 580 { 581 flushHeldFrame(); 582 583 if (_heldFrame is null) 584 { 585 process(); 586 } 587 } 588 589 private void flushHeldFrame() 590 { 591 if(_heldFrame !is null && _frameHandler.isHandlingFrames()) 592 { 593 _tail_closed = _frameHandler.handleFrame(_heldFrame); 594 _heldFrame = null; 595 } 596 } 597 598 private void reset() 599 { 600 _size = 0; 601 _state = State.SIZE_0; 602 } 603 604 long getFramesInput() 605 { 606 return _framesInput; 607 } 608 609 private void logHeader() { 610 if (_transport.isFrameTracingEnabled()) { 611 _transport.log(TransportImpl.INCOMING, new String(HEADER_DESCRIPTION)); 612 613 ProtocolTracer tracer = _transport.getProtocolTracer(); 614 if (tracer !is null) { 615 tracer.receivedHeader(HEADER_DESCRIPTION); 616 } 617 } 618 } 619 }