Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
    final synchronized public void open() throws UnknownHostException,
            IOException {
        if (openCounter == 0) {
            if (getExchangePattern() != null
                    && (getExchangePattern().equals(
                            ITransportExchangePattern.InOnly)
                            || getExchangePattern().equals(
                                    ITransportExchangePattern.InOptionalOut) || getExchangePattern()
                            .equals(ITransportExchangePattern.InOut))) {
                callOnMe.processInOpen();
            }
            if (getExchangePattern() != null
                    && (getExchangePattern().equals(
                            ITransportExchangePattern.OutOnly)
                            || getExchangePattern().equals(
                                    ITransportExchangePattern.OutOptionalIn) || getExchangePattern()
                            .equals(ITransportExchangePattern.InOut))) {
                callOnMe.processOutOpen();
            }
        }
        openCounter++;
    }
    
    final synchronized public void close() throws IOException {
        openCounter--;
        if (openCounter == 0) {
            if (getExchangePattern() != null
                    && (getExchangePattern().equals(
                            ITransportExchangePattern.InOnly)
                            || getExchangePattern().equals(
                                    ITransportExchangePattern.InOptionalOut) || getExchangePattern()
                            .equals(ITransportExchangePattern.InOut))) {
                callOnMe.processInClose();
            }
            if (getExchangePattern() != null
                    && (getExchangePattern().equals(
                            ITransportExchangePattern.OutOnly)
                            || getExchangePattern().equals(
                                    ITransportExchangePattern.OutOptionalIn) || getExchangePattern()
                            .equals(ITransportExchangePattern.InOut))) {
                callOnMe.processOutClose();
            }
        }
    }

The corresponding methods:

  • processInOpen()
  • processOutOpen()
  • processInClose()
  • processOutClose()

are called from the framework. In this methods the TransportHandler must open or close the connections. The "IN"-methods are called for sources, the "OUT" for sinks. When starting or stopping a query, open and close are called respectively.

In the following again the implemenations for the FileHandler

A TransportHandler can provide different exchange pattern. The handler must deliver the pattern when calling the following method:

Code Block
languagejava
    public ITransportExchangePattern getExchangePattern();

Currently, the following values are available (TODO: Extend description of remaining pattern):

  • InOnly: The handler can only be used as source.
  • RobustInOnly
  • InOut: The handler can be used as source and as sink.
  • InOptionalOut
  • OutOnly: The handler can only be used as sink.
  • RobustOutOnly
  • OutIn
  • OutOptionalIn

AbstractTransportHandler calls according to the exchange pattern the corresponding methods:

  • processInOpen()
  • processOutOpen()
  • processInClose()
  • processOutClose()

In this methods the TransportHandler must open or close the connections. The "IN"-methods are called for sources, the "OUT" for sinks. When starting or stopping a query, open and close are called respectively.

In the following again the implemenations for the FileHandler

Code Block
languagejava
    @Override
    public void processInOpen() throws IOException {        
Code Block
languagejava
    @Override
    public void processInOpen() throws IOException {        
        if (!preload) {
            final File file = new File(filename);
            try {
                in = new FileInputStream(file);
                fireOnConnect();
            } catchif (Exception e!preload) {
            final File file = new fireOnDisconnectFile(filename);
                throw e;try {
            }
    in = new FileInputStream(file);
 } else {
            fis = new FileInputStreamfireOnConnect(filename);
            FileChannel} channelcatch = fis.getChannel();
(Exception e) {
             long size = channel.sizefireOnDisconnect();
            double x = size / (double) Integer.MAX_VALUEthrow e;
            int}
 n = (int) Math.ceil(x);
    } else {
      ByteBuffer buffers[]      fis = new ByteBuffer[n];
 FileInputStream(filename);
            FileChannel channel = fis.getChannel();
            long size      for (int i = 0; i < n; i++) {
                buffers[i] = ByteBuffer.allocateDirect(Integer.MAX_VALUE);= channel.size();
            double x = size / (double) Integer.MAX_VALUE;
            int n = (int) Math.ceil(x);
            ByteBuffer buffers[] = new ByteBuffer[n];
            for (int i = 0; i < n; i++) {
                buffers[i] = ByteBuffer.allocateDirect(Integer.MAX_VALUE);
                channel.read(buffers[i]);
                buffers[i].rewind();
            }
            in = createInputStream(buffers);
            fireOnConnect();
        }
    }

    @Override
    public void processInClose() throws IOException {
        super.processInClose();
        if (fis != null) {
            fis.close();
        }
    }

    @Override
    public void processOutOpen() throws IOException {        
        final File file = new File(filename);
        try {
            out = new FileOutputStream(file, append);
            fireOnConnect();
        } catch (Exception e) {
            fireOnDisconnect();
            throw e;
        }
    }

    @Override
    public void processOutClose() throws IOException {
        fireOnDisconnect();
        out.flush();
        out.close();
    }

 

Generic Pull

After the connection is inialized, the framework tries to retrieve data from the TransportHandler. To be generic, we decided to use an InputStream for sources and an OutputStream for sinks. So the following methods need to be overwritten (Remark: It its not necessary to implement both methods, if the TransportHandler e.g. should only be used for sources):

Code Block
languagejava
public InputStream getInputStream();
public OutputStream getOutputStream();

A typical implementation in FileHandler:

Code Block
languagejava
    @Override
    public InputStream getInputStream() {
        return in;
    }
    @Override
    public OutputStream getOutputStream() {
        return out;
    }

In some cases, sources deliver not an endless data stream. For such cases the method

Code Block
languagejava
 public boolean isDone();

can be overwrittten.

GenericPush

In generic push szenarios for sources there is no method that can be overwritten because it depends on the transport type and e.g. libararies that receive data from external sources. The information that is read must be send to the corresponding transport handler. To simplify the process, AbstractTransportHandler(Delegate) provides the following methods that should be used:

Code Block
languagejava
    public void fireProcess(ByteBuffer message) {
        for (ITransportHandlerListener<T> l :     channel.read(buffers[i]);
         transportHandlerListener) {
       buffers[i].rewind();
     // TODO: flip() erases the contents of the }message if
            in// = createInputStream(buffers);
     it was already flipped or just created...
       fireOnConnect();
     // In other }
words: This method expects }

 that the byte @Overridebuffer
    public void processInClose() throws IOException {
   // is not fully  super.processInClose();
prepared
         if (fis != null) { message.flip();
            fisl.closeprocess(message);
        }
    }

    @Override
    public void processOutOpen() throws IOException {
    public void fireProcess(T m) {
        finalfor File(ITransportHandlerListener<T> filel =: new File(filename);transportHandlerListener) {
        try {    l.process(m);
        }    out = new FileOutputStream(file, append);
    }
    public void fireProcess(String[]  fireOnConnect();
message) {
        for }(ITransportHandlerListener<T> catchl (Exception: etransportHandlerListener) {
            fireOnDisconnectl.process(message);
        }
    throw e;}
    public void fireOnConnect(ITransportHandler handler) }{
    }

    @Override
for (ITransportHandlerListener<T> l : publictransportHandlerListener) void{
 processOutClose() throws IOException {
        fireOnDisconnectl.onConnect(handler);
        out.flush();}
    }
    public void out.close();
    }

A TransportHandler can provide different exchange pattern. The handler must deliver the pattern when calling the following method:

Code Block
languagejava
fireOnDisconnect(ITransportHandler handler) {
       public ITransportExchangePatternfor getExchangePattern();

Currently, the following values are available:

InOnly
RobustInOnly
InOut
InOptionalOut
OutOnly
RobustOutOnly
OutIn
OutOptionalIn

 

Generic Pull

ITransportHandlerListener<T> l : transportHandlerListener) {
            l.onDisonnect(handler);
        }
    }

The fireProcess methods can be used with ByteBuffers and String-Arrays or with a Generic. In the latter case, the corresponding ProtocolHandler must read this type, else a class cast exception will be thrown. Two additional methods are used to inform listener about connection states (connect and disconnect)After the connection is inialized, the framework tries to retrieve data from the TransportHandler. To be generic, we decided to use an InputStream for sources and an OutputStream for sinks. So the following methods need to be overwritten (Remark: It its not necessary to implement both methods, if the TransportHandler e.g. should only be used for sources):

Code Block
languagejava
    public InputStreamvoid getInputStreamfireOnConnect(ITransportHandler handler);
public OutputStream getOutputStream();

A typical implementation in FileHandler:

Code Block
languagejava
 {
        for (ITransportHandlerListener<T> l : transportHandlerListener) {
      @Override
    public InputStream getInputStreaml.onConnect(handler) {;
        return in;}
    }
    public  @Overridevoid fireOnDisconnect(ITransportHandler handler) {
    public    OutputStreamfor getOutputStream(ITransportHandlerListener<T> l : transportHandlerListener) {
            return out;
l.onDisonnect(handler);
        }

In some cases, sources deliver not an endless data stream. For such cases the method

Code Block
languagejava
 public boolean isDone();

can be overwrittten.

 

Source-Generic Push

Sink-Generic Pull

Sink-Generic Push

...


    }

 

 

 

OSGi, Registering, Declarative service

...