1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.SenderImpl; 13 14 import hunt.proton.codec.ReadableBuffer; 15 import hunt.proton.engine.EndpointState; 16 import hunt.proton.engine.Sender; 17 import hunt.proton.engine.impl.LinkImpl; 18 import hunt.proton.engine.impl.TransportSender; 19 import hunt.proton.engine.impl.SessionImpl; 20 import hunt.Exceptions; 21 import hunt.proton.engine.impl.DeliveryImpl; 22 import hunt.logging; 23 class SenderImpl : LinkImpl , Sender 24 { 25 private int _offered; 26 private TransportSender _transportLink; 27 28 this(SessionImpl session, string name) 29 { 30 super(session, name); 31 } 32 33 override 34 public void offer(int credits) 35 { 36 _offered = credits; 37 } 38 39 override 40 public int send(byte[] bytes, int offset, int length) 41 { 42 if (getLocalState() == EndpointState.CLOSED) 43 { 44 throw new IllegalStateException("send not allowed after the sender is closed."); 45 } 46 DeliveryImpl current = current(); 47 if (current is null || current.getLink() != this) 48 { 49 throw new IllegalArgumentException();//TODO. 50 } 51 int sent = current.send(bytes, offset, length); 52 if (sent > 0) { 53 getSession().incrementOutgoingBytes(sent); 54 } 55 return sent; 56 } 57 58 override 59 public int send(ReadableBuffer buffer) 60 { 61 if (getLocalState() == EndpointState.CLOSED) 62 { 63 throw new IllegalStateException("send not allowed after the sender is closed."); 64 } 65 DeliveryImpl current = current(); 66 if (current is null || current.getLink() != this) 67 { 68 throw new IllegalArgumentException(); 69 } 70 int sent = current.send(buffer); 71 if (sent > 0) { 72 getSession().incrementOutgoingBytes(sent); 73 } 74 return sent; 75 } 76 77 override 78 public int sendNoCopy(ReadableBuffer buffer) 79 { 80 if (getLocalState() == EndpointState.CLOSED) 81 { 82 throw new IllegalStateException("send not allowed after the sender is closed."); 83 } 84 DeliveryImpl current = current(); 85 if (current is null || current.getLink() != this) 86 { 87 throw new IllegalArgumentException(); 88 } 89 int sent = current.sendNoCopy(buffer); 90 if (sent > 0) { 91 getSession().incrementOutgoingBytes(sent); 92 } 93 return sent; 94 } 95 96 override 97 public void abort() 98 { 99 //TODO. 100 } 101 102 override 103 void doFree() 104 { 105 getSession().freeSender(this); 106 super.doFree(); 107 } 108 109 override 110 public bool advance() 111 { 112 DeliveryImpl delivery = current(); 113 if (delivery !is null) { 114 delivery.setComplete(); 115 } 116 117 bool advance = super.advance(); 118 if(advance && _offered > 0) 119 { 120 _offered--; 121 } 122 if(advance) 123 { 124 decrementCredit(); 125 delivery.addToTransportWorkList(); 126 getSession().incrementOutgoingDeliveries(1); 127 } 128 129 return advance; 130 } 131 132 bool hasOfferedCredits() 133 { 134 return _offered > 0; 135 } 136 137 override 138 TransportSender getTransportLink() 139 { 140 return _transportLink; 141 } 142 143 void setTransportLink(TransportSender transportLink) 144 { 145 _transportLink = transportLink; 146 } 147 148 149 override 150 public void setCredit(int credit) 151 { 152 super.setCredit(credit); 153 /* while(getQueued()>0 && getCredit()>0) 154 { 155 advance(); 156 }*/ 157 } 158 159 override 160 public int getRemoteCredit() 161 { 162 // Credit is decremented as soon as advance is called on a send, 163 // so we need only consider the credit count, not the queued count. 164 return getCredit(); 165 } 166 }