1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.ReceiverImpl; 13 14 import hunt.proton.amqp.UnsignedInteger; 15 import hunt.proton.codec.ReadableBuffer; 16 import hunt.proton.codec.WritableBuffer; 17 import hunt.proton.engine.Receiver; 18 import hunt.proton.engine.impl.LinkImpl; 19 import hunt.proton.engine.impl.DeliveryImpl; 20 import hunt.proton.engine.impl.TransportReceiver; 21 import hunt.proton.engine.impl.SessionImpl; 22 import hunt.Exceptions; 23 24 25 class ReceiverImpl : LinkImpl , Receiver 26 { 27 private bool _drainFlagMode = true; 28 29 override 30 public bool advance() 31 { 32 DeliveryImpl current = current(); 33 if(current !is null) 34 { 35 current.setDone(); 36 } 37 bool advance = super.advance(); 38 if(advance) 39 { 40 decrementQueued(); 41 decrementCredit(); 42 getSession().incrementIncomingBytes(-current.pending()); 43 getSession().incrementIncomingDeliveries(-1); 44 if (getSession().getTransportSession().getIncomingWindowSize() == (UnsignedInteger.ZERO)) { 45 modified(); 46 } 47 } 48 return advance; 49 } 50 51 private TransportReceiver _transportReceiver; 52 private int _unsentCredits; 53 54 55 this(SessionImpl session, string name) 56 { 57 super(session, name); 58 } 59 60 override 61 public void flow(int credits) 62 { 63 addCredit(credits); 64 _unsentCredits += credits; 65 modified(); 66 if (!_drainFlagMode) 67 { 68 setDrain(false); 69 _drainFlagMode = false; 70 } 71 } 72 73 int clearUnsentCredits() 74 { 75 int credits = _unsentCredits; 76 _unsentCredits = 0; 77 return credits; 78 } 79 80 override 81 public int recv(byte[] bytes, int offset, int size) 82 { 83 if (_current is null) { 84 throw new IllegalStateException("no current delivery"); 85 } 86 87 int consumed = _current.recv(bytes, offset, size); 88 if (consumed > 0) { 89 getSession().incrementIncomingBytes(-consumed); 90 if (getSession().getTransportSession().getIncomingWindowSize() == (UnsignedInteger.ZERO)) { 91 modified(); 92 } 93 } 94 return consumed; 95 } 96 97 override 98 public int recv(WritableBuffer buffer) 99 { 100 if (_current is null) { 101 throw new IllegalStateException("no current delivery"); 102 } 103 104 int consumed = _current.recv(buffer); 105 if (consumed > 0) { 106 getSession().incrementIncomingBytes(-consumed); 107 if (getSession().getTransportSession().getIncomingWindowSize() == (UnsignedInteger.ZERO)) { 108 modified(); 109 } 110 } 111 return consumed; 112 } 113 114 override 115 public ReadableBuffer recv() 116 { 117 if (_current is null) { 118 throw new IllegalStateException("no current delivery"); 119 } 120 121 ReadableBuffer consumed = _current.recv(); 122 if (consumed.remaining() > 0) { 123 getSession().incrementIncomingBytes(-consumed.remaining()); 124 if (getSession().getTransportSession().getIncomingWindowSize() == (UnsignedInteger.ZERO)) { 125 modified(); 126 } 127 } 128 return consumed; 129 } 130 131 override 132 void doFree() 133 { 134 getSession().freeReceiver(this); 135 super.doFree(); 136 } 137 138 bool hasIncoming() 139 { 140 return false; //TODO - Implement 141 } 142 143 void setTransportLink(TransportReceiver transportReceiver) 144 { 145 _transportReceiver = transportReceiver; 146 } 147 148 override 149 TransportReceiver getTransportLink() 150 { 151 return _transportReceiver; 152 } 153 154 override 155 public void drain(int credit) 156 { 157 setDrain(true); 158 flow(credit); 159 _drainFlagMode = false; 160 } 161 162 override 163 public bool draining() 164 { 165 return getDrain() && (getCredit() > getQueued()); 166 } 167 168 override 169 public void setDrain(bool drain) 170 { 171 super.setDrain(drain); 172 modified(); 173 _drainFlagMode = true; 174 } 175 176 override 177 public int getRemoteCredit() 178 { 179 // Credit is only decremented once advance is called on a received message, 180 // so we also need to consider the queued count. 181 return getCredit() - getQueued(); 182 } 183 }