Page tree
Skip to end of metadata
Go to start of metadata

Its easy to create your own generator.

All generators are in the repository Odysseus Applications / Applications.

The abstract generator can be found in "de.uniol.inf.is.odysseus.generator" und offers a minimal data structure and interfaces that can be used to create and transmit a data tuple via a tcp based byte stream to Odysseus. Since everything exists for the transfer, only the generator of the data tuple is necessary. This is done by the following steps:

  • creating a new bundle, e.g. called "example" so that we have a bundle called "de.uniol.inf.is.odysseus.generator.example"
  • Under "dependencies" of the MANIFEST.MF, add the package "de.uniol.inf.is.odysseus.generator" to import packages.
  • Then, you can create your generator as follows:

Create the Generator

  • Create a new class (e.g. ExampleDataProvider) und extend the abstract class AbstractDataGenerator, so you get the following code:

    package de.uniof.inf.is.odysseus.generator.example;
    
    import de.uniol.inf.is.odysseus.generator.DataTuple;
    import de.uniol.inf.is.odysseus.generator.StreamClientHandler;
    import de.uniol.inf.is.odysseus.generator.StreamServer; 
    public class ExampleDataProvider extends AbstractDataGenerator{
    
    	@Override
    	public void process_init() {
    		// Initialize your generators
    	}
    
    	@Override
    	public void close() {
    		//  Shutdown your generators
    	}
    
    	@Override
    	public List<DataTuple> next() {
    		// Generate next value
    		return null;
    	}
    
    	@Override
    	public ExampleDataProvider newCleanInstance() {
    		// Create a new clean instance
    		return null;
    	}
    
     }
    

The clone method must be implemented and should deliver a new instance of the object!

process_init

This method is invoked once at the beginning. Here you can, e.g. open a file reader.

close

Is called once at the end when e.g. Odysseus closes the connection. Here, a file reader may be closed.

next

  • This method is continuously called in an endless loop after the socket connection is established
    • Since there is no pause in the loop, you may use Thread.sleep to avoid system overloads
  • The return type must be a list of DataTuple.
    • DataTuple offers a simple structure for a relational tuple. You can simply extend a DataTuple by using addAttribute(...). The datatype of the object that is passed to addAttribute is used for the byte-based transfer. Therefore, an integer should be passed with "Long".
      Creating a data tuple may look like in the following example:


      ...
      DataTuple tuple = new DataTuple();
      tuple.addAttribute(new Long(time));
      tuple.addAttribute(new Integer(transId));
      tuple.addAttribute("beispiel text");
      tuple.addAttribute(76);
      ...
      
    • In this example, a tuple is created that has a long, an integer, a string and an integer (auto-boxing of java is used here)
    • Since the return type is a list, it is possible to generate a batch of such DataTuple and put them into a a list (e.g. LinkedList or ArrayList).
    • you may also use the value-generators (see below) to generate alternating, increasing or other values including errors and bursts!
  • If the method next() returns null, the socket connection is closed and next is not called again.
  • If a DataTuple is created like in the previous example, this should be according to the schema in Odysseus. So, if you use StreamSQL aka CQL in ODysseus, you may define the following schema (STARTTIMESTAMP in Odysseus are Long-Objects...):

    CREATE STREAM exampleStream (timestamp STARTTIMESTAMP, transaction_id INTEGER, name STRING, value INTEGER) ....
    
  • The order, number and types of the attributes that are used during the creation of a DataTuple (in this case, long, integer, string, integer), should not changed over time, because Odysseus expects a fix schema. So, if the data generator changes the implicit schema, the other side of the connection (wich is Odysseus) may get confused.

Start Server for Generators

  • A generator (the class that is extended from the StreamClientHandler) can be started by a StreamServer.
  • Normally, you can create a new StreamServer in the Activator of the bundle and pass the generator as a parameter of the constructor to the StreamServer. Additionally, you have to provide the port (e.g. 54321). This may look as follows:

    @Override
    public void start(BundleContext bundleContext) throws Exception {
        Activator.context = bundleContext;
        StreamServer server = new StreamServer(54321, new ExampleDataProvider());
        server.start();
    }
    

Finally, this bundle must be started by the launch-config. If your generator bundle does not start, you may uncheck the option "only start bundle if one of its classes is loaded" in the plugin.xml. And don't forget to add the required bundles (generator-base und org.eclipse.osgi) to the run-config. This may look like the following:

Optional: Print Throughput

You can optionally pass an integer parameter for the constructor of the StreamServer called "printThroughputEach", which enables the calculation of the throughput and prints the throughput each printThroughputEach times. Additionally, the stats are also printed at the end (e.g. if the stream server is interrupted or the connection is closed)

Optional:Give Name

You can optionally pass a string that simply gives the streamserver a name. This is helpful, for example, if you have started the same StreamClientHandler twice (or even more) on different ports and want to distinguish the console outputs.

Use Predefined Value-Generators

The generator-framework also provides several generators to create the values. This allows, for example, to create alternating, increasing, constant, random or other values. Additionally, you can add an error model that pollutes the generated values with random, or continuous errors. Furthermore, there is also a burst-error mode to add noise like jitter or duplicates.

See Available Generators for a list.

Example

In the following you can see an example for a wind-data-generator that should show how to use these value-generators.

public class WindGenerator extends AbstractDataGenerator{
    //CREATE STREAM wind (timestamp LONG, bft INTEGER, wind_speed DOUBLE, wind_direction INTEGER, location INTEGER) CHANNEL localhost : 54321;

    private ISingleValueGenerator time;
    private ISingleValueGenerator beaufort;
    private ISingleValueGenerator speed;
    private ISingleValueGenerator direction;
    private ISingleValueGenerator location;

    @Override
    public void process_init() {
        // Time
        ISingleValueGenerator time = new IncreaseGenerator(new NoError(), 0, 1);
        time.init();
        // Beaufort
        ISingleValueGenerator beaufort = new UniformDistributionGenerator(new ContinuousErrorModel(new DuplicateNoise(), 3), 2, 30);
        beaufort.init();
        // speed
        ISingleValueGenerator speed = new PredifinedValueGenerator(101, 164, 454, 324, 145, 123, 241, 232, 322);
        speed.init();
        // Direction
        ISingleValueGenerator direction = new ConstantValueGenerator(new BurstErrorModel(new JitterNoise(50), 10, 4), 182);
        direction.init();
        // Location
        ISingleValueGenerator location = new AlternatingGenerator(new NoError(), 0, 2, 0, 20);
        location.init();
    }
    @Override
    public List<DataTuple> next() {
        DataTuple tuple = new DataTuple();
        // number / time (long)
        tuple.addLong(time.nextValue());
        // bft (integer)
        tuple.addInteger(beaufort.nextValue());
        // wind speed (double)
        tuple.addDouble(speed.nextValue());
        // wind direction (integer)
        tuple.addInteger(direction.nextValue());
        // location (integer)
        tuple.addInteger(location.nextValue());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        List<DataTuple> list = new ArrayList<DataTuple>();
        list.add(tuple);
        return list;
    }
    
    
    @Override
    public void close() {        
        
    }
    @Override
    public StreamClientHandler newCleanInstance() {
        return new WindGenerator();
    }
}

We have five value generators for time, beaufort, speed, direction and location that are all defined as a private field.


Then, the value-generators are setup in the init-method. The time is an IncreaseGenerator with no error (NoError) model. It starts at 0 and each time when the generator is called, 1 is added to the current value. Thus, this generator creates following values: 0, 1, 2, 3, 4, 5, 6....

The beaufort is a UniformDistributionGenerator with a continuous error model that adds duplicate noise to the values. The generator creates random (uniformly distributed) values from 2 till 30. Additionally, the generator has a ContinuousErrorModel that adds a certain noise at certain intervals. In this example, it add a DuplicateNoise each 3 generated value. The duplicate noise itself simply changes the generated value to its double amount. Thus, UniformDistributionGenerator creates values like 4, 23, 12, 1, 21, 20, 6, 5, 3... the ContinuousErrorModel changes each 3rd value by using the DuplicateNoise. Thus, the final generated values are: 4, 23, 24, 1, 21, 40, 6, 5, 6...

The speed is a PredifinedValueGenerator, which simply creates the given values 101, 164, 454, 324, 145, 123, 241, 232, 322 and starts at the beginning after 322

The direction is a ConstantValueGenerator that generates a constant value (in this case 182). However, it also has a BurstErrorModel that in contrast to the ContinuousErrorModel does not add noise vor a single value but rather for a certain number of values. In this example, the bursts take 4 values and is applied each 10 elements. The generated values are polluted with a JitterNoise that adds a random variation of 50, so that they are fluctuating between 157 and  207. The generated values may, for example, like: 182, 182, 182, 182, 182, 182, 182, 182, 182, 161, 202, 193, 158, 182, 182, 182, 182, 182, 182, 182, 182, 182, 182, 163,...

The location is an AlternatingGenerator that has no error model. It produces values between a min and a max value by increasing and decreasing the value by certain factor. This example starts with 10 adds 2 until it reaches 20. Then it subtracts 2 until it reaches 0. This leads to the following values: 10, 12, 14, 16, 18, 20, 18, 16, 14, 12, 10, 8, 6, 4, 2, 0, 2, 4, 6, 8, 10, 12, 14, ....


Finally, you can use the value generators by calling the "nextValue"-method in the next()-method. Since the value-generators only produce double-values, you possibly have to cast them into other data types like an integer by using the addInteger method of DataTuple!

Thats it.

PS: We also have added a sleep of 1000 ms so that the values are only generated each second.

Comments and Hints

  • It is also possible to create more than one data provider (generator) or you can also start the same implementation on different ports by using the parameters for the StreamServer.
  • For each new connection to the StreamServer, a new instance of the StreamClientHandler (or in our case the concrete ExampleDataProvider) is created by using the clone-method
    • This allows, e.g. to manage more than one connection at the same time
    • at each new connection, the data generator starts from the beginning - so if you don't want to have this behavior, you have to keep the state in mind!
    • During tests in Odysseus, the data generator can be kept running and each reconnect of Odysseus would produce a restart of the test case
    • The generator must not be part of Odysseus (or run in the same OSGi container), so the generator may run on its on machine


  • No labels