Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
	public static final String NAME = "File";
    
	@Override
    public String getName() {
        return NAME;
    }

When implementing ITransportHandler, open and close need to be implemented.

Hint: In the following we will assume, that AbstractTransportHandler will be overwritten.

AbstractTransportHandler provides already default implementations that cannot be overwritten (its implementend in AbstractTransportHandlerDelegate):

Code Block
languagejava
    final synchronized public void open() throws UnknownHostException,
            IOException {
        if (openCounter == 0) {
            if (getExchangePattern() != null
                    && (getExchangePattern().equals(
                            ITransportExchangePattern.InOnly)
                            || getExchangePattern().equals(
                                    ITransportExchangePattern.InOptionalOut) || getExchangePattern()
                            .equals(ITransportExchangePattern.InOut))) {
                callOnMe.processInOpen();
            }
            if (getExchangePattern() != null
                    && (getExchangePattern().equals(
                            ITransportExchangePattern.OutOnly)
                            || getExchangePattern().equals(
                                    ITransportExchangePattern.OutOptionalIn) || getExchangePattern()
                            .equals(ITransportExchangePattern.InOut))) {
                callOnMe.processOutOpen();
            }
        }
        openCounter++;
    }
    
    final synchronized public void close() throws IOException {
        openCounter--;
        if (openCounter == 0) {
            if (getExchangePattern() != null
                    && (getExchangePattern().equals(
                            ITransportExchangePattern.InOnly)
                            || getExchangePattern().equals(
                                    ITransportExchangePattern.InOptionalOut) || getExchangePattern()
                            .equals(ITransportExchangePattern.InOut))) {
                callOnMe.processInClose();
            }
            if (getExchangePattern() != null
                    && (getExchangePattern().equals(
                            ITransportExchangePattern.OutOnly)
                            || getExchangePattern().equals(
                                    ITransportExchangePattern.OutOptionalIn) || getExchangePattern()
                            .equals(ITransportExchangePattern.InOut))) {
                callOnMe.processOutClose();
            }
        }
    }

The corresponding methods:

  • processInOpen()
  • processOutOpen()
  • processInClose()
  • processOutClose()

are called from the framework. In this methods the TransportHandler must open or close the connections. The "IN"-methods are called for sources, the "OUT" for sinks. When starting or stopping a query, open and close are called respectively.

In the following again the implemenations for the FileHandler

Code Block
languagejava
    @Override
    public void processInOpen() throws IOException {        
        if (!preload) {
            final File file = new File(filename);
            try {
                in = new FileInputStream(file);
                fireOnConnect();
            } catch (Exception e) {
                fireOnDisconnect();
                throw e;
            }
        } else {
            fis = new FileInputStream(filename);
            FileChannel channel = fis.getChannel();
            long size = channel.size();
            double x = size / (double) Integer.MAX_VALUE;
            int n = (int) Math.ceil(x);
            ByteBuffer buffers[] = new ByteBuffer[n];
            for (int i = 0; i < n; i++) {
                buffers[i] = ByteBuffer.allocateDirect(Integer.MAX_VALUE);
                channel.read(buffers[i]);
                buffers[i].rewind();
            }
            in = createInputStream(buffers);
            fireOnConnect();
        }
    }

    @Override
    public void processInClose() throws IOException {
        super.processInClose();
        if (fis != null) {
            fis.close();
        }
    }

    @Override
    public void processOutOpen() throws IOException {        
        final File file = new File(filename);
        try {
            out = new FileOutputStream(file, append);
            fireOnConnect();
        } catch (Exception e) {
            fireOnDisconnect();
            throw e;
        }
    }

    @Override
    public void processOutClose() throws IOException {
        fireOnDisconnect();
        out.flush();
        out.close();
    }

 

 

 

Source-Generic PUll

 

 

Source-Generic Push

Sink-Generic Pull

Sink-Generic Push

...