1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.Reactor; 13 14 //import java.io.IOException; 15 //import java.util.Set; 16 import hunt.proton.engine.ReactorChild; 17 import hunt.proton.engine.BaseHandler; 18 import hunt.proton.engine.Collector; 19 import hunt.proton.engine.Connection; 20 import hunt.proton.engine.Event; 21 import hunt.proton.engine.Handler; 22 import hunt.proton.engine.HandlerException; 23 import hunt.proton.engine.Record; 24 //import hunt.proton.reactor.ReactorOptions; 25 //import hunt.proton.reactor.impl.ReactorImpl; 26 import hunt.collection.Set; 27 import hunt.proton.engine.Task; 28 import hunt.proton.engine.Selectable; 29 import hunt.String; 30 31 /** 32 * The proton reactor provides a general purpose event processing 33 * library for writing reactive programs. A reactive program is defined 34 * by a set of event handlers. An event handler is just any class or 35 * object that extends the Handler interface. For convenience, a class 36 * can extend {@link BaseHandler} and only handle the events that it cares to 37 * implement methods for. 38 * <p> 39 * This class is not thread safe (with the exception of the {@link #wakeup()} 40 * method) and should only be used by a single thread at any given time. 41 */ 42 interface Reactor { 43 44 class Factory 45 { 46 //public static Reactor create() { 47 // return new ReactorImpl(); 48 //} 49 // 50 //public static Reactor create(ReactorOptions options) { 51 // return new ReactorImpl(options); 52 //} 53 } 54 55 /** 56 * Updates the last time that the reactor's state has changed, potentially 57 * resulting in events being generated. 58 * @return the current time in milliseconds 59 * {@link System#currentTimeMillis()}. 60 */ 61 long mark(); 62 63 /** @return the last time that {@link #mark()} was called. */ 64 long now(); 65 66 /** @return an instance of {@link Record} that can be used to associate 67 * other objects (attachments) with this instance of the 68 * Reactor class. 69 */ 70 Record attachments(); 71 72 /** 73 * The value the reactor will use for {@link Selector#select(long)} that is called as part of {@link #process()}. 74 * 75 * @param timeout a timeout value in milliseconds, to associate with this instance of 76 * the reactor. This can be retrieved using the 77 * {@link #getTimeout()} method 78 */ 79 void setTimeout(long timeout); 80 81 /** 82 * @return the value previously set using {@link #setTimeout(long)} or 83 * 0 if no previous value has been set. 84 */ 85 long getTimeout(); 86 87 /** 88 * @return the global handler for this reactor. Every event the reactor 89 * sees is dispatched to the global handler. To receive every 90 * event generated by the reactor, associate a child handler 91 * with the global handler. For example: 92 * <pre> 93 * getGlobalHandler().add(yourHandler); 94 * </pre> 95 */ 96 Handler getGlobalHandler(); 97 98 /** 99 * Sets a new global handler. You probably don't want to do this and 100 * would be better adding a handler to the value returned by the 101 * {{@link #getGlobalHandler()} method. 102 * @param handler the new global handler. 103 */ 104 void setGlobalHandler(Handler handler); 105 106 /** 107 * @return the handler for this reactor. Every event the reactor sees, 108 * which is not handled by a child of the reactor (such as a 109 * timer, connection, acceptor, or selector) is passed to this 110 * handler. To receive these events, it is recommend that you 111 * associate a child handler with the handler returned by this 112 * method. For example: 113 * <pre> 114 * getHandler().add(yourHandler); 115 * </pre> 116 */ 117 Handler getHandler(); 118 119 /** 120 * Sets a new handler, that will receive any events not handled by a child 121 * of the reactor. Note that setting a handler via this method replaces 122 * the previous handler, and will result in no further events being 123 * dispatched to the child handlers associated with the previous handler. 124 * For this reason it is recommended that you do not use this method and 125 * instead add child handlers to the value returned by the 126 * {@link #getHandler()} method. 127 * @param handler the new handler for this reactor. 128 */ 129 void setHandler(Handler handler); 130 131 /** 132 * @return a set containing the child objects associated with this reactor. 133 * This will contain any active instances of: {@link Task} - 134 * created using the {@link #schedule(int, Handler)} method, 135 * {@link Connection} - created using the 136 * {@link #connectionToHost(String, int, Handler)} method, 137 * {@link Acceptor} - created using the 138 * {@link #acceptor(String, int)} method, 139 * {@link #acceptor(String, int, Handler)} method, or 140 * {@link Selectable} - created using the 141 * {@link #selectable()} method. 142 */ 143 Set!ReactorChild children(); 144 145 /** 146 * @return the Collector used to gather events generated by this reactor. 147 */ 148 Collector collector(); 149 150 /** 151 * Creates a new <code>Selectable</code> as a child of this reactor. 152 * @return the newly created <code>Selectable</code>. 153 */ 154 Selectable selectable(); 155 156 /** 157 * Updates the specified <code>Selectable</code> either emitting a 158 * {@link Type#SELECTABLE_UPDATED} event if the selectable is not terminal, 159 * or {@link Type#SELECTABLE_FINAL} if the selectable is terminal and has 160 * not already emitted a {@link Type#SELECTABLE_FINAL} event. 161 * @param selectable 162 */ 163 void update(Selectable selectable); 164 165 /** 166 * Yields, causing the next call to {@link #process()} to return 167 * successfully - without processing any events. If multiple calls 168 * can be made to <code>yield</code> and only the next invocation of 169 * {@link #process()} will be affected. 170 */ 171 void yield() ; 172 173 /** 174 * @return <code>true</code> if the reactor is in quiesced state (e.g. has 175 * no events to process). <code>false</code> is returned otherwise. 176 */ 177 bool quiesced(); 178 179 /** 180 * Process any events pending for this reactor. Events are dispatched to 181 * the handlers registered with the reactor, or child objects associated 182 * with the reactor. This method blocks until the reactor has no more work 183 * to do (and no more work pending, in terms of scheduled tasks or open 184 * selectors to process). 185 * @return <code>true</code> if the reactor may have more events in the 186 * future. For example: if there are scheduled tasks, or open 187 * selectors. <code>false</code> is returned if the reactor has 188 * (and will have) no more events to process. 189 * @throws HandlerException if an unchecked exception is thrown by one of 190 * the handlers - it will be re-thrown attached to an instance of 191 * <code>HandlerException</code>. 192 */ 193 bool process() ; 194 195 /** 196 * Wakes up the thread (if any) blocked in the {@link #process()} method. 197 * This is the only method of this class that is thread safe, in that it 198 * can be used at the same time as another thread is using the reactor. 199 */ 200 void wakeup(); 201 202 /** 203 * Starts the reactor. This method should be invoked before the first call 204 * to {@link #process()}. 205 */ 206 void start(); 207 208 /** 209 * Stops the reactor. This method should be invoked after the last call to 210 * {@link #process()}. 211 * @throws HandlerException 212 */ 213 void stop() ; 214 215 /** 216 * Simplifies the use of the reactor by wrapping the use of 217 * <code>start</code>, <code>run</code>, and <code>stop</code> method 218 * calls. 219 * <p> 220 * Logically the implementation of this method is: 221 * <pre> 222 * start(); 223 * while(process()) {} 224 * stop(); 225 * </pre> 226 * @throws HandlerException if an unchecked exception is thrown by one of 227 * the handlers - it will be re-thrown attached to an instance of 228 * <code>HandlerException</code>. 229 */ 230 void run() ; 231 232 /** 233 * Schedules execution of a task to take place at some point in the future. 234 * @param delay the number of milliseconds, in the future, to schedule the 235 * task for. 236 * @param handler a handler to associate with the task. This is notified 237 * when the deadline for the task is reached. 238 * @return an object representing the task that has been scheduled. 239 */ 240 Task schedule(int delay, Handler handler); 241 242 /** 243 * Creates a new out-bound connection. 244 * @param handler a handler that is notified when events occur for the 245 * connection. Typically the host and port to connect to 246 * would be supplied to the connection object inside the 247 * logic which handles the {@link Type#CONNECTION_INIT} 248 * event via 249 * {@link #setConnectionHost(Connection, String, int)} 250 * @return the newly created connection object. 251 * @deprecated Use {@link #connectionToHost(String, int, Handler)} instead. 252 */ 253 Connection connection(Handler handler); 254 255 /** 256 * Creates a new out-bound connection to the given host and port. 257 * <p> 258 * This method will cause Reactor to set up a network connection to the 259 * host and create a Connection for it. 260 * @param host the host to connect to (e.g. "localhost") 261 * @param port the port used for the connection. 262 * @param handler a handler that is notified when events occur for the 263 * connection. 264 * @return the newly created connection object. 265 */ 266 Connection connectionToHost(string host, int port, Handler handler); 267 268 /** 269 * Set the host address used by the connection 270 * <p> 271 * This method will set/change the host address used by the Reactor to 272 * create an outbound network connection for the given Connection 273 * @param c the Connection to assign the address to 274 * @param host the address of the host to connect to (e.g. "localhost") 275 * @param port the port to use for the connection. 276 */ 277 void setConnectionHost(Connection c, string host, int port); 278 279 /** 280 * Gets the reactor options. 281 * 282 * @return the reactor options 283 */ 284 //ReactorOptions getOptions(); 285 286 /** 287 * Get the address used by the connection 288 * <p> 289 * This may be used to retrieve the remote peer address. 290 * Note that the returned address may be in numeric IP format. 291 * @param c the Connection 292 * @return a string containing the address in the following format: 293 * <pre> 294 * host[:port] 295 * </pre> 296 */ 297 string getConnectionAddress(Connection c); 298 299 /** 300 * Creates a new acceptor. This is equivalent to calling: 301 * <pre> 302 * acceptor(host, port, null); 303 * </pre> 304 * @param host 305 * @param port 306 * @return the newly created acceptor object. 307 * @throws IOException 308 */ 309 // Acceptor acceptor(String host, int port) throws IOException; 310 311 /** 312 * Creates a new acceptor. This acceptor listens for in-bound connections. 313 * @param host the host name or address of the NIC to listen on. 314 * @param port the port number to listen on. 315 * @param handler if non-<code>null</code> this handler is registered with 316 * each new connection accepted by the acceptor. 317 * @return the newly created acceptor object. 318 * @throws IOException 319 */ 320 //Acceptor acceptor(String host, int port, Handler handler) 321 // throws IOException; 322 323 /** 324 * Frees any resources (such as sockets and selectors) held by the reactor 325 * or its children. 326 */ 327 void free(); 328 }