View Javadoc

1   /*
2    * @(#) ThreadedEventProducer.java Jul 12, 2004
3    * 
4    * Copyright (c) 2003-2005 Delft University of Technology, Jaffalaan 5, 2628 BX
5    * Delft, the Netherlands. All rights reserved.
6    * 
7    * See for project information <a href="http://www.simulation.tudelft.nl/">
8    * www.simulation.tudelft.nl </a>.
9    * 
10   * The source code and binary code of this software is proprietary information
11   * of Delft University of Technology.
12   */
13  
14  package org.gscg.common.interactionlayer;
15  
16  import java.io.IOException;
17  import java.io.ObjectOutputStream;
18  import java.io.Serializable;
19  import java.lang.reflect.Field;
20  import java.rmi.ConnectException;
21  import java.rmi.RemoteException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  
28  import nl.tudelft.simulation.event.Event;
29  import nl.tudelft.simulation.event.EventInterface;
30  import nl.tudelft.simulation.event.EventListenerInterface;
31  import nl.tudelft.simulation.event.EventProducer;
32  import nl.tudelft.simulation.language.reflection.ClassUtil;
33  import nl.tudelft.simulation.logger.Logger;
34  
35  import org.gscg.common.DijkstraSemaphore;
36  
37  /***
38   * The ThreadedEventProducer extends the EventProducer. A different thread than
39   * the one used in the simulation is used to fire events to a client-side
40   * EventListener. Whenever the processing of an event client-side takes a long
41   * time, the simulation will continue and not wait. Meanwhile, other events are
42   * stored until the 'slow' event has been processed client-side. After this, the
43   * ThreadedEventProducer will try to catch up.
44   * <p>
45   * 
46   * Copyright (c) 2003-2005 Delft University of Technology, Jaffalaan 5, 2628 BX
47   * Delft, the Netherlands. All rights reserved.
48   * 
49   * See for project information <a href="http://www.simulation.tudelft.nl/">
50   * www.simulation.tudelft.nl </a>.
51   * 
52   * The source code and binary code of this software is proprietary information
53   * of Delft University of Technology.
54   * 
55   * @see nl.tudelft.simulation.event.EventProducer
56   * 
57   * @author <a
58   *         href="http://www.tbm.tudelft.nl/webstaf/stijnh/index.htm">Stijn-Pieter
59   *         van Houten </a>
60   * @author <a
61   *         href="http://www.tbm.tudelft.nl/webstaf/alexandv/index.htm">Alexander
62   *         Verbraeck </a>
63   * 
64   * @version $Revision: 1.1 $ $Date: 2005/06/16 12:33:58 $
65   * @since 1.0.0
66   */
67  public class ThreadedEventProducer extends EventProducer
68  {
69  	/*** the dijkstra semaphore, value is adjusted in the model class */
70  	public transient static DijkstraSemaphore DIJKSTRASEMAPHORE = new DijkstraSemaphore(
71  			1);
72  
73  	/*** the serial version uid */
74  	private static final long serialVersionUID = 13L;
75  
76  	/*** the source field of an event */
77  	private static Field sourceField = null;
78  
79  	static
80  	{
81  		try
82  		{
83  			ThreadedEventProducer.sourceField = ClassUtil.resolveField(
84  					Event.class, "source");
85  			sourceField.setAccessible(true);
86  		} catch (Exception exception)
87  		{
88  			Logger.severe(ThreadedEventProducer.class, "<clinit>", exception);
89  		}
90  	}
91  
92  	/*** this thread executes events */
93  	private transient NotifyThread notifyThread = null;
94  
95  	/*** the list with events to fire */
96  	private transient List events = Collections
97  			.synchronizedList(new ArrayList());
98  
99  	/***
100 	 * constructs a new ThreadedEventProducer
101 	 */
102 	public ThreadedEventProducer()
103 	{
104 		super();
105 		this.notifyThread = new NotifyThread(this);
106 		// since we use -Xss2m as an argument for the VM
107 		// in order to support serialization.
108 		// here we create a thread with stack size 1;
109 		// as such forcing the VM to use the default value (128k)
110 		// for the stack size for these threads instead of 2m
111 		Thread thread = new Thread(null, this.notifyThread, "", 1);
112 		thread.start();
113 	}
114 
115 	/***
116 	 * @see nl.tudelft.simulation.event.EventProducer#fireEvent(nl.tudelft.simulation.event.EventListenerInterface,
117 	 *      nl.tudelft.simulation.event.EventInterface)
118 	 */
119 	protected synchronized EventInterface fireEvent(
120 			final EventListenerInterface listener, final EventInterface event)
121 			throws RemoteException
122 	{
123 		this.notifyThread.add(listener, event);
124 		return event;
125 	}
126 
127 	//
128 	// private methods
129 	//
130 	/***
131 	 * writes a serializable method to stream
132 	 * 
133 	 * @param out the outputstream
134 	 * @throws IOException on IOException
135 	 */
136 	private synchronized void writeObject(final ObjectOutputStream out)
137 			throws IOException
138 	{
139 		out.defaultWriteObject();
140 		// we write the maximum count
141 		out.writeObject(new Integer(ThreadedEventProducer.DIJKSTRASEMAPHORE
142 				.getMaximumCount()));
143 	}
144 
145 	/***
146 	 * reads a serializable method from stream
147 	 * 
148 	 * @param in the inputstream
149 	 */
150 	private synchronized void readObject(final java.io.ObjectInputStream in)
151 	{
152 		try
153 		{
154 			in.defaultReadObject();
155 			Integer maximumCount = (Integer) in.readObject();
156 
157 			// custom deserialize methods
158 			this.events = Collections.synchronizedList(new ArrayList());
159 
160 			// set the maximum count of the dijkstra semaphore
161 			ThreadedEventProducer.DIJKSTRASEMAPHORE
162 					.setMaximumCount(maximumCount.intValue());
163 
164 			this.notifyThread = new NotifyThread(this);
165 			Thread thread = new Thread(null, this.notifyThread, "", 1);
166 			thread.start();
167 		} catch (Exception exception)
168 		{
169 			Logger.severe(this, "readObject", exception);
170 		}
171 	}
172 
173 	/***
174 	 * A combination of a reference and an event.
175 	 * <p>
176 	 * (c) copyright 2005 <a href="http://www.simulation.tudelft.nl">Delft
177 	 * University of Technology </a>, the Netherlands. <br>
178 	 * See for project information <a
179 	 * href="http://www.simulation.tudelft.nl">www.simulation.tudelft.nl </a>
180 	 * <br>
181 	 * 
182 	 * Copyright (c) 2003-2005 Delft University of Technology, Jaffalaan 5, 2628
183 	 * BX Delft, the Netherlands. All rights reserved.
184 	 * 
185 	 * See for project information <a href="http://www.simulation.tudelft.nl/">
186 	 * www.simulation.tudelft.nl </a>.
187 	 * 
188 	 * The source code and binary code of this software are proprietary
189 	 * information of Delft University of Technology.
190 	 * 
191 	 * @author <a
192 	 *         href="http://www.tbm.tudelft.nl/webstaf/stijnh/index.htm">Stijn-Pieter
193 	 *         van Houten </a>
194 	 * @version $Revision: 1.1 $ $Date: 2005/06/16 12:33:58 $
195 	 * @since 1.1.3
196 	 */
197 	private class EventReference implements Serializable
198 	{
199 		/*** the serial version uid */
200 		private static final long serialVersionUID = 11L;
201 
202 		/*** the reference */
203 		private EventListenerInterface reference;
204 
205 		/*** the event */
206 		private EventInterface event;
207 
208 		/***
209 		 * constructs a new EventReference
210 		 * 
211 		 * @param event the event
212 		 * @param reference the reference
213 		 */
214 		public EventReference(final EventInterface event,
215 				final EventListenerInterface reference)
216 		{
217 			this.event = event;
218 			this.reference = reference;
219 		}
220 
221 		/***
222 		 * @return Returns the event.
223 		 * 
224 		 */
225 		public EventInterface getEvent()
226 		{
227 			return this.event;
228 		}
229 
230 		/***
231 		 * @return Returns the reference.
232 		 * 
233 		 */
234 		public EventListenerInterface getReference()
235 		{
236 			return this.reference;
237 		}
238 	}
239 
240 	/***
241 	 * The NotifyThread fires the event and notifies a client-side remote
242 	 * object.
243 	 * <p>
244 	 * Copyright (c) 2003-2005 Delft University of Technology, Jaffalaan 5, 2628
245 	 * BX Delft, the Netherlands. All rights reserved.
246 	 * 
247 	 * See for project information <a href="http://www.simulation.tudelft.nl/">
248 	 * www.simulation.tudelft.nl </a>.
249 	 * 
250 	 * The source code and binary code of this software is proprietary
251 	 * information of Delft University of Technology. *
252 	 * 
253 	 * @author <a
254 	 *         href="http://www.tbm.tudelft.nl/webstaf/stijnh/index.htm">Stijn-Pieter
255 	 *         van Houten </a>
256 	 * @author <a
257 	 *         href="http://www.tbm.tudelft.nl/webstaf/alexandv/index.htm">Alexander
258 	 *         Verbraeck </a>
259 	 * @version $Revision: 1.1 $ $Date: 2005/06/16 12:33:58 $
260 	 * @since 1.0.0
261 	 */
262 	private class NotifyThread implements Runnable
263 	{
264 		/*** the number of events, used for performance measuring */
265 		private int number = 0;
266 
267 		/*** the counter for the total number of events */
268 		private int counter = 0;
269 
270 		/*** the size of the content of events in kb sent to remote clients */
271 		private int size = 0;
272 
273 		/***
274 		 * indicates whether to measure performance (for output stream size;
275 		 * slows down the simulation)
276 		 */
277 		private boolean performance = false;
278 
279 		/*** the owner of the thread */
280 		private ThreadedEventProducer owner = null;
281 
282 		/*** the set with dead references */
283 		private Set deadReferences = new HashSet();
284 
285 		/***
286 		 * constructs a new NotifyThread
287 		 * 
288 		 * @param owner the owner
289 		 */
290 		public NotifyThread(final ThreadedEventProducer owner)
291 		{
292 			super();
293 			this.owner = owner;
294 		}
295 
296 		/***
297 		 * Method add.
298 		 * 
299 		 * @param reference the reference
300 		 * @param event the event
301 		 */
302 		public void add(final EventListenerInterface reference,
303 				final EventInterface event)
304 		{
305 			if (!this.deadReferences.contains(reference))
306 			{
307 				this.owner.events.add(new EventReference(event, reference));
308 				if (this.performance)
309 				{
310 					this.number++;
311 					this.counter++;
312 					// we measure the output stream
313 					this.size += ObjectMeter.getObjectSize(event.getContent());
314 					if (this.counter % 10 == 0)
315 					{
316 						System.out.println("ThreadedEventProducer: Number: "
317 								+ this.number);
318 						System.out.println("ThreadedEventProducer: Size: "
319 								+ this.size / 1024 + " kb");
320 					}
321 				}
322 			} else
323 			{
324 				// the reference died.....
325 				this.owner.removeListener(reference, event.getType());
326 			}
327 		}
328 
329 		/***
330 		 * @see java.lang.Runnable#run()
331 		 */
332 		public void run()
333 		{
334 			while (true)
335 			{
336 				try
337 				{
338 					if (this.owner.events.size() > 0)
339 					{
340 						// we get our handle from the dijkstra semaphore
341 						// during serialization no handles are available,
342 						// leading to a pause of this thread
343 						ThreadedEventProducer.DIJKSTRASEMAPHORE.acquire();
344 						{
345 							while (this.owner.events.size() > 0)
346 							{
347 								try
348 								{
349 									EventReference er = (EventReference) this.owner.events
350 											.remove(0);
351 									if (this.performance)
352 									{
353 										this.number--;
354 										this.counter++;
355 										if (this.counter % 10 == 0)
356 										{
357 											System.out
358 													.println("ThreadedEventProducer: Number: "
359 															+ this.number);
360 											System.out
361 													.println("ThreadedEventProducer: Size: "
362 															+ this.size
363 															/ 1024
364 															+ " kb");
365 										}
366 									}
367 									if (!this.deadReferences.contains(er
368 											.getReference()))
369 									{
370 										try
371 										{
372 											// replace the source of the
373 											// event
374 											ThreadedEventProducer.sourceField
375 													.set(er.getEvent(),
376 															this.owner);
377 											// invoke notify
378 											er.getReference().notify(
379 													er.getEvent());
380 										} catch (ConnectException connectException)
381 										{
382 											if (this.performance)
383 											{
384 												System.out
385 														.println("ThreadedEventProducer: Reference died: "
386 																+ er
387 																		.getReference()
388 																+ " event: "
389 																+ er.getEvent());
390 											}
391 											this.deadReferences.add(er
392 													.getReference());
393 											// we ignore this exception
394 											// probably the client
395 											// application is dead
396 											this.owner.removeListener(er
397 													.getReference(), er
398 													.getEvent().getType());
399 										}
400 									}
401 								} catch (Exception exception)
402 								{
403 									nl.tudelft.simulation.logger.Logger.severe(
404 											this, "run", exception);
405 								}
406 							}
407 						}
408 						ThreadedEventProducer.DIJKSTRASEMAPHORE.release();
409 					}
410 					try
411 					{
412 						// we sleep one nanosecond and see whether we need
413 						// to fire an event
414 						Thread.sleep(1, 0);
415 					} catch (InterruptedException interruptedException)
416 					{
417 						nl.tudelft.simulation.logger.Logger.severe(this, "run",
418 								interruptedException);
419 					}
420 				} catch (Exception exception)
421 				{
422 					ThreadedEventProducer.DIJKSTRASEMAPHORE.release();
423 					nl.tudelft.simulation.logger.Logger.severe(this, "run",
424 							exception);
425 				}
426 			}
427 		}
428 	}
429 }