...
Code Block | ||
---|---|---|
| ||
public static final String NAME = "File"; @Override public String getName() { return NAME; } |
When implementing ITransportHandler, open and close need to be implemented.
Hint: In the following we will assume, that AbstractTransportHandler will be overwritten.
AbstractTransportHandler provides already default implementations that cannot be overwritten (its implementend in AbstractTransportHandlerDelegate):
Code Block | ||
---|---|---|
| ||
final synchronized public void open() throws UnknownHostException,
IOException {
if (openCounter == 0) {
if (getExchangePattern() != null
&& (getExchangePattern().equals(
ITransportExchangePattern.InOnly)
|| getExchangePattern().equals(
ITransportExchangePattern.InOptionalOut) || getExchangePattern()
.equals(ITransportExchangePattern.InOut))) {
callOnMe.processInOpen();
}
if (getExchangePattern() != null
&& (getExchangePattern().equals(
ITransportExchangePattern.OutOnly)
|| getExchangePattern().equals(
ITransportExchangePattern.OutOptionalIn) || getExchangePattern()
.equals(ITransportExchangePattern.InOut))) {
callOnMe.processOutOpen();
}
}
openCounter++;
}
final synchronized public void close() throws IOException {
openCounter--;
if (openCounter == 0) {
if (getExchangePattern() != null
&& (getExchangePattern().equals(
ITransportExchangePattern.InOnly)
|| getExchangePattern().equals(
ITransportExchangePattern.InOptionalOut) || getExchangePattern()
.equals(ITransportExchangePattern.InOut))) {
callOnMe.processInClose();
}
if (getExchangePattern() != null
&& (getExchangePattern().equals(
ITransportExchangePattern.OutOnly)
|| getExchangePattern().equals(
ITransportExchangePattern.OutOptionalIn) || getExchangePattern()
.equals(ITransportExchangePattern.InOut))) {
callOnMe.processOutClose();
}
}
} |
The corresponding methods:
- processInOpen()
- processOutOpen()
- processInClose()
- processOutClose()
are called from the framework. In this methods the TransportHandler must open or close the connections. The "IN"-methods are called for sources, the "OUT" for sinks. When starting or stopping a query, open and close are called respectively.
In the following again the implemenations for the FileHandler
Code Block | ||
---|---|---|
| ||
@Override
public void processInOpen() throws IOException {
if (!preload) {
final File file = new File(filename);
try {
in = new FileInputStream(file);
fireOnConnect();
} catch (Exception e) {
fireOnDisconnect();
throw e;
}
} else {
fis = new FileInputStream(filename);
FileChannel channel = fis.getChannel();
long size = channel.size();
double x = size / (double) Integer.MAX_VALUE;
int n = (int) Math.ceil(x);
ByteBuffer buffers[] = new ByteBuffer[n];
for (int i = 0; i < n; i++) {
buffers[i] = ByteBuffer.allocateDirect(Integer.MAX_VALUE);
channel.read(buffers[i]);
buffers[i].rewind();
}
in = createInputStream(buffers);
fireOnConnect();
}
}
@Override
public void processInClose() throws IOException {
super.processInClose();
if (fis != null) {
fis.close();
}
}
@Override
public void processOutOpen() throws IOException {
final File file = new File(filename);
try {
out = new FileOutputStream(file, append);
fireOnConnect();
} catch (Exception e) {
fireOnDisconnect();
throw e;
}
}
@Override
public void processOutClose() throws IOException {
fireOnDisconnect();
out.flush();
out.close();
} |
Source-Generic PUll
Source-Generic Push
Sink-Generic Pull
Sink-Generic Push
...