1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.gscg.common.interactionlayer;
15
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.io.Serializable;
19 import java.lang.reflect.Field;
20 import java.rmi.ConnectException;
21 import java.rmi.RemoteException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Set;
27
28 import nl.tudelft.simulation.event.Event;
29 import nl.tudelft.simulation.event.EventInterface;
30 import nl.tudelft.simulation.event.EventListenerInterface;
31 import nl.tudelft.simulation.event.EventProducer;
32 import nl.tudelft.simulation.language.reflection.ClassUtil;
33 import nl.tudelft.simulation.logger.Logger;
34
35 import org.gscg.common.DijkstraSemaphore;
36
37 /***
38 * The ThreadedEventProducer extends the EventProducer. A different thread than
39 * the one used in the simulation is used to fire events to a client-side
40 * EventListener. Whenever the processing of an event client-side takes a long
41 * time, the simulation will continue and not wait. Meanwhile, other events are
42 * stored until the 'slow' event has been processed client-side. After this, the
43 * ThreadedEventProducer will try to catch up.
44 * <p>
45 *
46 * Copyright (c) 2003-2005 Delft University of Technology, Jaffalaan 5, 2628 BX
47 * Delft, the Netherlands. All rights reserved.
48 *
49 * See for project information <a href="http://www.simulation.tudelft.nl/">
50 * www.simulation.tudelft.nl </a>.
51 *
52 * The source code and binary code of this software is proprietary information
53 * of Delft University of Technology.
54 *
55 * @see nl.tudelft.simulation.event.EventProducer
56 *
57 * @author <a
58 * href="http://www.tbm.tudelft.nl/webstaf/stijnh/index.htm">Stijn-Pieter
59 * van Houten </a>
60 * @author <a
61 * href="http://www.tbm.tudelft.nl/webstaf/alexandv/index.htm">Alexander
62 * Verbraeck </a>
63 *
64 * @version $Revision: 1.1 $ $Date: 2005/06/16 12:33:58 $
65 * @since 1.0.0
66 */
67 public class ThreadedEventProducer extends EventProducer
68 {
69 /*** the dijkstra semaphore, value is adjusted in the model class */
70 public transient static DijkstraSemaphore DIJKSTRASEMAPHORE = new DijkstraSemaphore(
71 1);
72
73 /*** the serial version uid */
74 private static final long serialVersionUID = 13L;
75
76 /*** the source field of an event */
77 private static Field sourceField = null;
78
79 static
80 {
81 try
82 {
83 ThreadedEventProducer.sourceField = ClassUtil.resolveField(
84 Event.class, "source");
85 sourceField.setAccessible(true);
86 } catch (Exception exception)
87 {
88 Logger.severe(ThreadedEventProducer.class, "<clinit>", exception);
89 }
90 }
91
92 /*** this thread executes events */
93 private transient NotifyThread notifyThread = null;
94
95 /*** the list with events to fire */
96 private transient List events = Collections
97 .synchronizedList(new ArrayList());
98
99 /***
100 * constructs a new ThreadedEventProducer
101 */
102 public ThreadedEventProducer()
103 {
104 super();
105 this.notifyThread = new NotifyThread(this);
106
107
108
109
110
111 Thread thread = new Thread(null, this.notifyThread, "", 1);
112 thread.start();
113 }
114
115 /***
116 * @see nl.tudelft.simulation.event.EventProducer#fireEvent(nl.tudelft.simulation.event.EventListenerInterface,
117 * nl.tudelft.simulation.event.EventInterface)
118 */
119 protected synchronized EventInterface fireEvent(
120 final EventListenerInterface listener, final EventInterface event)
121 throws RemoteException
122 {
123 this.notifyThread.add(listener, event);
124 return event;
125 }
126
127
128
129
130 /***
131 * writes a serializable method to stream
132 *
133 * @param out the outputstream
134 * @throws IOException on IOException
135 */
136 private synchronized void writeObject(final ObjectOutputStream out)
137 throws IOException
138 {
139 out.defaultWriteObject();
140
141 out.writeObject(new Integer(ThreadedEventProducer.DIJKSTRASEMAPHORE
142 .getMaximumCount()));
143 }
144
145 /***
146 * reads a serializable method from stream
147 *
148 * @param in the inputstream
149 */
150 private synchronized void readObject(final java.io.ObjectInputStream in)
151 {
152 try
153 {
154 in.defaultReadObject();
155 Integer maximumCount = (Integer) in.readObject();
156
157
158 this.events = Collections.synchronizedList(new ArrayList());
159
160
161 ThreadedEventProducer.DIJKSTRASEMAPHORE
162 .setMaximumCount(maximumCount.intValue());
163
164 this.notifyThread = new NotifyThread(this);
165 Thread thread = new Thread(null, this.notifyThread, "", 1);
166 thread.start();
167 } catch (Exception exception)
168 {
169 Logger.severe(this, "readObject", exception);
170 }
171 }
172
173 /***
174 * A combination of a reference and an event.
175 * <p>
176 * (c) copyright 2005 <a href="http://www.simulation.tudelft.nl">Delft
177 * University of Technology </a>, the Netherlands. <br>
178 * See for project information <a
179 * href="http://www.simulation.tudelft.nl">www.simulation.tudelft.nl </a>
180 * <br>
181 *
182 * Copyright (c) 2003-2005 Delft University of Technology, Jaffalaan 5, 2628
183 * BX Delft, the Netherlands. All rights reserved.
184 *
185 * See for project information <a href="http://www.simulation.tudelft.nl/">
186 * www.simulation.tudelft.nl </a>.
187 *
188 * The source code and binary code of this software are proprietary
189 * information of Delft University of Technology.
190 *
191 * @author <a
192 * href="http://www.tbm.tudelft.nl/webstaf/stijnh/index.htm">Stijn-Pieter
193 * van Houten </a>
194 * @version $Revision: 1.1 $ $Date: 2005/06/16 12:33:58 $
195 * @since 1.1.3
196 */
197 private class EventReference implements Serializable
198 {
199 /*** the serial version uid */
200 private static final long serialVersionUID = 11L;
201
202 /*** the reference */
203 private EventListenerInterface reference;
204
205 /*** the event */
206 private EventInterface event;
207
208 /***
209 * constructs a new EventReference
210 *
211 * @param event the event
212 * @param reference the reference
213 */
214 public EventReference(final EventInterface event,
215 final EventListenerInterface reference)
216 {
217 this.event = event;
218 this.reference = reference;
219 }
220
221 /***
222 * @return Returns the event.
223 *
224 */
225 public EventInterface getEvent()
226 {
227 return this.event;
228 }
229
230 /***
231 * @return Returns the reference.
232 *
233 */
234 public EventListenerInterface getReference()
235 {
236 return this.reference;
237 }
238 }
239
240 /***
241 * The NotifyThread fires the event and notifies a client-side remote
242 * object.
243 * <p>
244 * Copyright (c) 2003-2005 Delft University of Technology, Jaffalaan 5, 2628
245 * BX Delft, the Netherlands. All rights reserved.
246 *
247 * See for project information <a href="http://www.simulation.tudelft.nl/">
248 * www.simulation.tudelft.nl </a>.
249 *
250 * The source code and binary code of this software is proprietary
251 * information of Delft University of Technology. *
252 *
253 * @author <a
254 * href="http://www.tbm.tudelft.nl/webstaf/stijnh/index.htm">Stijn-Pieter
255 * van Houten </a>
256 * @author <a
257 * href="http://www.tbm.tudelft.nl/webstaf/alexandv/index.htm">Alexander
258 * Verbraeck </a>
259 * @version $Revision: 1.1 $ $Date: 2005/06/16 12:33:58 $
260 * @since 1.0.0
261 */
262 private class NotifyThread implements Runnable
263 {
264 /*** the number of events, used for performance measuring */
265 private int number = 0;
266
267 /*** the counter for the total number of events */
268 private int counter = 0;
269
270 /*** the size of the content of events in kb sent to remote clients */
271 private int size = 0;
272
273 /***
274 * indicates whether to measure performance (for output stream size;
275 * slows down the simulation)
276 */
277 private boolean performance = false;
278
279 /*** the owner of the thread */
280 private ThreadedEventProducer owner = null;
281
282 /*** the set with dead references */
283 private Set deadReferences = new HashSet();
284
285 /***
286 * constructs a new NotifyThread
287 *
288 * @param owner the owner
289 */
290 public NotifyThread(final ThreadedEventProducer owner)
291 {
292 super();
293 this.owner = owner;
294 }
295
296 /***
297 * Method add.
298 *
299 * @param reference the reference
300 * @param event the event
301 */
302 public void add(final EventListenerInterface reference,
303 final EventInterface event)
304 {
305 if (!this.deadReferences.contains(reference))
306 {
307 this.owner.events.add(new EventReference(event, reference));
308 if (this.performance)
309 {
310 this.number++;
311 this.counter++;
312
313 this.size += ObjectMeter.getObjectSize(event.getContent());
314 if (this.counter % 10 == 0)
315 {
316 System.out.println("ThreadedEventProducer: Number: "
317 + this.number);
318 System.out.println("ThreadedEventProducer: Size: "
319 + this.size / 1024 + " kb");
320 }
321 }
322 } else
323 {
324
325 this.owner.removeListener(reference, event.getType());
326 }
327 }
328
329 /***
330 * @see java.lang.Runnable#run()
331 */
332 public void run()
333 {
334 while (true)
335 {
336 try
337 {
338 if (this.owner.events.size() > 0)
339 {
340
341
342
343 ThreadedEventProducer.DIJKSTRASEMAPHORE.acquire();
344 {
345 while (this.owner.events.size() > 0)
346 {
347 try
348 {
349 EventReference er = (EventReference) this.owner.events
350 .remove(0);
351 if (this.performance)
352 {
353 this.number--;
354 this.counter++;
355 if (this.counter % 10 == 0)
356 {
357 System.out
358 .println("ThreadedEventProducer: Number: "
359 + this.number);
360 System.out
361 .println("ThreadedEventProducer: Size: "
362 + this.size
363 / 1024
364 + " kb");
365 }
366 }
367 if (!this.deadReferences.contains(er
368 .getReference()))
369 {
370 try
371 {
372
373
374 ThreadedEventProducer.sourceField
375 .set(er.getEvent(),
376 this.owner);
377
378 er.getReference().notify(
379 er.getEvent());
380 } catch (ConnectException connectException)
381 {
382 if (this.performance)
383 {
384 System.out
385 .println("ThreadedEventProducer: Reference died: "
386 + er
387 .getReference()
388 + " event: "
389 + er.getEvent());
390 }
391 this.deadReferences.add(er
392 .getReference());
393
394
395
396 this.owner.removeListener(er
397 .getReference(), er
398 .getEvent().getType());
399 }
400 }
401 } catch (Exception exception)
402 {
403 nl.tudelft.simulation.logger.Logger.severe(
404 this, "run", exception);
405 }
406 }
407 }
408 ThreadedEventProducer.DIJKSTRASEMAPHORE.release();
409 }
410 try
411 {
412
413
414 Thread.sleep(1, 0);
415 } catch (InterruptedException interruptedException)
416 {
417 nl.tudelft.simulation.logger.Logger.severe(this, "run",
418 interruptedException);
419 }
420 } catch (Exception exception)
421 {
422 ThreadedEventProducer.DIJKSTRASEMAPHORE.release();
423 nl.tudelft.simulation.logger.Logger.severe(this, "run",
424 exception);
425 }
426 }
427 }
428 }
429 }