Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Work in progress!

 

Although, there is already a large set of wrappers available for Odysseus, there is often the need to create a new specific adapter for a specific source. To ease the creation of new wrapper, we created an adapter framework.

...

  • Transport: Die physical bridge between the external systems and Odysseus is given by this handler. It is responsible for the communication, this could be e.g. a file access, a tcp client/server or a message bus handler.
    Hint: There are scenarios in which it is not feasible to separate transport and protocol layer. In such cases, one can implement the combination as a transport handler and use it in combination with the "None" protocol handler.
  • Protocol: While the transport handler handles the connections, the protocol handler is responsible for the interpretation of the given input coming from the transport handler. I.e. the protocol handler translates the incoming data into a format understandable by Odysseus
  • DataHandler: Finally, the internal format can be defined. This is the data type, which is used by the internal operators of Odysseus. There are different kind of operators for different kinds of datatypes. DataHandler are e.g. Tuple or KeyValue.

...

The wrapper can also be used at sink side, i.e. to send data from Odysseus to other systems. In this case, the information is send from the datahandler via the protocol handler to the transport handler. Each handler can provide both ways, i.e. retrieving and sending of data but is not required to. So some handler may only be used in sources, while other only in sinks.

Here are two sequence diagramms that show via an example how the different handler interoperate:

Image AddedImage Added


This document explains how to write new wrappers using this generic wrappers.

To create a new TransportHandler, the Interface ITransportHandler must be implemented or the class AbstractTransportHandler be extended.

Depending on the way, the handler works, different methods need to be implemented.

Independent of Push/Pull

Code Block
languagejava
public ITransportHandler createInstance(IProtocolHandler<?> protocolHandler, Map<String, String> options);

This method must return a new initialized transport handler. Typically, the constructor is called.

Code Block
languagejava
titleExample of FileHandler
    @Override
    public ITransportHandler createInstance(
            IProtocolHandler<?> protocolHandler, Map<String, String> options) {
        return new FileHandler(protocolHandler, options);
    }

The methode getName() must deliver a global unique transport handler name.

Code Block
languagejava
    String getName();

Its a good was to use this a follows (again example of FileHandler):

Code Block
languagejava
	public static final String NAME = "File";
    
	@Override
    public String getName() {
        return NAME;
    }

When implementing ITransportHandler, open and close need to be implemented.

Hint: In the following we will assume, that AbstractTransportHandler will be overwritten.

AbstractTransportHandler provides already default implementations that cannot be overwritten (its implementend in AbstractTransportHandlerDelegate):

Code Block
languagejava
    final synchronized public void open() throws UnknownHostException,
            IOException {
        if (openCounter == 0) {
            if (getExchangePattern() != null
                    && (getExchangePattern().equals(
                            ITransportExchangePattern.InOnly)
                            || getExchangePattern().equals(
                                    ITransportExchangePattern.InOptionalOut) || getExchangePattern()
                            .equals(ITransportExchangePattern.InOut))) {
                callOnMe.processInOpen();
            }
            if (getExchangePattern() != null
                    && (getExchangePattern().equals(
                            ITransportExchangePattern.OutOnly)
                            || getExchangePattern().equals(
                                    ITransportExchangePattern.OutOptionalIn) || getExchangePattern()
                            .equals(ITransportExchangePattern.InOut))) {
                callOnMe.processOutOpen();
            }
        }
        openCounter++;
    }
    
    final synchronized public void close() throws IOException {
        openCounter--;
        if (openCounter == 0) {
            if (getExchangePattern() != null
                    && (getExchangePattern().equals(
                            ITransportExchangePattern.InOnly)
                            || getExchangePattern().equals(
                                    ITransportExchangePattern.InOptionalOut) || getExchangePattern()
                            .equals(ITransportExchangePattern.InOut))) {
                callOnMe.processInClose();
            }
            if (getExchangePattern() != null
                    && (getExchangePattern().equals(
                            ITransportExchangePattern.OutOnly)
                            || getExchangePattern().equals(
                                    ITransportExchangePattern.OutOptionalIn) || getExchangePattern()
                            .equals(ITransportExchangePattern.InOut))) {
                callOnMe.processOutClose();
            }
        }
    }

The corresponding methods:

  • processInOpen()
  • processOutOpen()
  • processInClose()
  • processOutClose()

are called from the framework. In this methods the TransportHandler must open or close the connections. The "IN"-methods are called for sources, the "OUT" for sinks. When starting or stopping a query, open and close are called respectively.

In the following again the implemenations for the FileHandler

Code Block
languagejava
    @Override
    public void processInOpen() throws IOException {        
        if (!preload) {
            final File file = new File(filename);
            try {
                in = new FileInputStream(file);
                fireOnConnect();
            } catch (Exception e) {
                fireOnDisconnect();
                throw e;
            }
        } else {
            fis = new FileInputStream(filename);
            FileChannel channel = fis.getChannel();
            long size = channel.size();
            double x = size / (double) Integer.MAX_VALUE;
            int n = (int) Math.ceil(x);
            ByteBuffer buffers[] = new ByteBuffer[n];
            for (int i = 0; i < n; i++) {
                buffers[i] = ByteBuffer.allocateDirect(Integer.MAX_VALUE);
                channel.read(buffers[i]);
                buffers[i].rewind();
            }
            in = createInputStream(buffers);
            fireOnConnect();
        }
    }

    @Override
    public void processInClose() throws IOException {
        super.processInClose();
        if (fis != null) {
            fis.close();
        }
    }

    @Override
    public void processOutOpen() throws IOException {        
        final File file = new File(filename);
        try {
            out = new FileOutputStream(file, append);
            fireOnConnect();
        } catch (Exception e) {
            fireOnDisconnect();
            throw e;
        }
    }

    @Override
    public void processOutClose() throws IOException {
        fireOnDisconnect();
        out.flush();
        out.close();
    }

 

Generic Pull

After the connection is inialized, the framework tries to retrieve data from the TransportHandler. To be generic, we decided to use an InputStream for sources and an OutputStream for sinks. So the following methods need to be overwritten (Remark: It its not necessary to implement both methods, if the TransportHandler e.g. should only be used for sources):

Code Block
languagejava
public InputStream getInputStream();
public OutputStream getOutputStream();

A typical implementation in FileHandler:

Code Block
languagejava
    @Override
    public InputStream getInputStream() {
        return in;
    }
    @Override
    public OutputStream getOutputStream() {
        return out;
    }

 

Source-Generic Push

Sink-Generic Pull

Sink-Generic Push

 

 

 

 

OSGi, Registering, Declarative service

Creating a new Protocol Handler

 

 

 

 

...