Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
    @Override
    public void processInOpen() throws IOException {        
        if (!preload) {
            final File file = new File(filename);
            try {
                in = new FileInputStream(file);
                fireOnConnect();
            } catch (Exception e) {
                fireOnDisconnect();
                throw e;
            }
        } else {
            fis = new FileInputStream(filename);
            FileChannel channel = fis.getChannel();
            long size = channel.size();
            double x = size / (double) Integer.MAX_VALUE;
            int n = (int) Math.ceil(x);
            ByteBuffer buffers[] = new ByteBuffer[n];
            for (int i = 0; i < n; i++) {
                buffers[i] = ByteBuffer.allocateDirect(Integer.MAX_VALUE);
                channel.read(buffers[i]);
                buffers[i].rewind();
            }
            in = createInputStream(buffers);
            fireOnConnect();
        }
    }

    @Override
    public void processInClose() throws IOException {
        super.processInClose();
        if (fis != null) {
            fis.close();
        }
    }

    @Override
    public void processOutOpen() throws IOException {        
        final File file = new File(filename);
        try {
            out = new FileOutputStream(file, append);
            fireOnConnect();
        } catch (Exception e) {
            fireOnDisconnect();
            throw e;
        }
    }

    @Override
    public void processOutClose() throws IOException {
        fireOnDisconnect();
        out.flush();
        out.close();
    }

 Imporant: Use processInStart if the transport handler connects to a service that immediatly sends data!



Generic Pull

After the connection is inialized, the framework tries to retrieve data from the TransportHandler. To be generic, we decided to use an InputStream for sources and an OutputStream for sinks. So the following methods need to be overwritten (Remark: It its not necessary to implement both methods, if the TransportHandler e.g. should only be used for sources):

...

Another way, implementing a transport handler that is less generic and delivers e.g. tuples directly can be found in the next example. In this case, the protocol handler must be 'none' ..

 


Code Block
languagejava
package de.uniol.inf.is.odysseus.wrapper.temper1.physicaloperator.access;


import java.util.Map;
import java.util.Random;

import de.uniol.inf.is.odysseus.core.collection.Tuple;
import de.uniol.inf.is.odysseus.core.physicaloperator.access.protocol.IProtocolHandler;
import de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.AbstractSimplePullTransportHandler;
import de.uniol.inf.is.odysseus.core.physicaloperator.access.transport.ITransportHandler;

public class RandomTransportHandler extends AbstractSimplePullTransportHandler<Tuple<?>>{

    private static final String NAME = "Random";
    private static final Random RAND = new Random();
    
    @Override
    public ITransportHandler createInstance(IProtocolHandler<?> protocolHandler, Map<String, String> options) {
        RandomTransportHandler tHandler = new RandomTransportHandler();
        
        protocolHandler.setTransportHandler(tHandler);
        
        return tHandler;
    }

    @Override
    public String getName() {
        return NAME;
    }
    
    private static float readDevice() {
        return 20f + ( 10 * RAND.nextFloat());
    }

    @Override
    public boolean hasNext() {
        return true;
    }

    @SuppressWarnings("rawtypes")
    @Override
    public Tuple<?> getNext() {
        Tuple<?> tuple = new Tuple(1, false);
        tuple.setAttribute(0, readDevice());
        return tuple;
    }
    
    @Override
    public boolean isSemanticallyEqualImpl(ITransportHandler other) {
        return false;
    }

}

 

 



GenericPush

In generic push szenarios for sources there is no method that can be overwritten because it depends on the transport type and e.g. libararies that receive data from external sources. The information that is read must be send to the corresponding transport handler. To simplify the process, AbstractTransportHandler(Delegate) provides the following methods that should be used:

...

These methode are defined in the interface ITransportHandlerListener that is implemented by IProtocolHandler, the basic interface for ProtocolHandler. 


Registering the handler

Odysseus is OSGi based and all the handler are implemented as declartive services.

...