Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The "Database Feature", which is not part of the default Odysseus, must be installed before. See also How to install a new featurefeatures

There are two possibilities how to use a database connection. For the one hand, you can create a persistent connection and reuse this connection in other queries. On the other hand, you can directly create a connection for each query. First, we show how to create and drop persistent connections and show how to use them in PQL and StreamSQL(CQL). Afterwards, we show how to directly setup a connection using PQLAs there are some problems with the CQL database parts, we suggest to use PQL.


Table of Contents

Direct Database Connections

With PQL you can also run direct connections. You have two possibilities to create connections: preconfigured drivers and JDBC strings. These possibilities are equal for DATABASESOURCE and DATABASESINK.

Direct reading with preconfigured driver

If you want to define a preconfigured dbms (see also the above section), you can use existing drivers (at the moment "mysql", "postgresql" and "oracle"). In addition to the type, you have at least to define the database where you want connect to. Furthermore, you can or have to use all other operators of DATABASESOURCE (see above), but you don't need the "connection" parameter for the connetion name. So, if you want to read the table "main" from a MySQL database called "test", this looks like the following query:

Code Block
datastream = DATABASESOURCE({table='main', type='mysql', db='test', connection="con1"})

Since the host is implicitely localhost or the user is root, you may - according to the create-database-connection statement - also define additional parameters, e.g. the user, the password, the host or the port:

Code Block
datastream = DATABASESOURCE({table='main', type='mysql', db='test', user='dbuser', password='dbpassword', host='localhost', port=3306, connection="con1"})

See also the "Reading from a database" for PQL above for the attributes parameter, which is also available, because this query would also fetch the schema from the database - which invokes a connection during the translation of the query.

Direct reading with JDBC

You can also use a JDBC string instead of a preconfigured driver, for example, if you want to use special properties:

Code Block
datastream = DATABASESOURCE({table='main', jdbc='jdbc:mysql://localhost:3306/test', connection="con1"})

While using the jdbc parameter, you can also use the user and password parameter for setting up the credentials:

Code Block
datastream = DATABASESOURCE({table='main', jdbc='jdbc:mysql://localhost:3306/test', user='dbuser', password='dbpassword', connection="con1"})

Direct writing with preconfigured driver

The DATABASESINK works very similar to the DATABASESOURCE. So, you also have to use at least the parameters "type" and "db" instead of the "connection" parameter for the connection name. So, you can directly write the output of "otherops" into the table "main".

Code Block
datastream = DATABASESINK({table='main', type='mysql', db='test', connection="con1"}, otherops)

Of cource, you can also use the drop or truncate parameter of DATABASESINK:

Code Block
datastream = DATABASESINK({table='main', drop='true', type='mysql', db='test', connection="con1"}, otherops)

Furthermore, according to the source, there are also additional parameters for user, password, host and port:

Code Block
datastream = DATABASESINK({table='main', type='mysql', db='test', user='dbuser', password='dbpassword', host='localhost', port=3306, connection="con1"}, otherops)

Direct writing with JDBC

You can also use a JDBC string instead of a preconfigured driver, for example, if you want to use special properties. For wrting the stream otherops into the table "main":

Code Block
datastream = DATABASESINK({table='main', jdbc='jdbc:mysql://localhost:3306/test', connection="con1"}, otherops)

While using the jdbc parameter, you can also use the user and password parameter for setting up the credentials:

datastream = DATABASESINK({table='main', jdbc='jdbc:mysql://localhost:3306/test', user='dbuser', password='dbpassword', connection="con1"}, otherops)

Using a connection in PQL

In PQL you can reuse an existing database connection that is defined with the connection parameter (e.g. connection="con1")

Reading from a database

If you want to read from a database, you may use the DATABASESOURCE operator. For an existing connection called "con1" we can read from the table called "main":

Code Block
datastream = DATABASESOURCE({connection='con1', table='main'})

This query tries to fetch the schema from the table main. If the schema in a MySQL has, for example, the attributes "id, value" with the datatypes "int, varchar", this is mapped to an output schema "id, value" with the datatypes "integer, string" in Odysseus. If a database has a special data type that is not known by JDBC (see java.sql.Types), it is mapped to the Odysseus datatype called "Object". Since the fetching of the schema from the database opens a connection, you can explicitely define the schema alternatively by using the optional "attributes" paramter here:

Code Block
datastream = DATABASESOURCE({connection='con1', table='main', attributes=[['id', 'INTEGER'],['val', 'STRING']]})

Since PQL allows to create views and names for reusing connections (see The Odysseus Procedural Query Language (PQL) Framework), you may use this syntax, for example to create a dedicated source entry.

Code Block
datastream := DATABASESOURCE({connection='con1', table='main'})

Then you can use this in other queries, like in the following example:

Code Block
example = PROJECT({attributes=['val']}, datastream)

Remember the special behevior, if you reuse an existing operator. If there is already an operator that reads from the database, a second operator would join this reading - and not beginning from the start of the table.

Since the reading from the database is done as fast as possible, you possibly want to slow down this. An optional paramter indicates the number of milliseconds that the operator should wait between each tuple that is read. For example, for reading each second (=1000 milliseconds):

Code Block
datastream := DATABASESOURCE({connection='con1', table='main', waiteach=1000})
Writing to a database

You can write a stream into a database by using the DATABASESINK operator. So, if you have for example a stream or an operator that is named "datastream", the DATABASESINK can be used as follows:

Code Block
result = DATABASESINK({connection='con1', table='main'}, datastream)

This query opens connection con1 and writes the resulting data stream from the view/source datastream to the table main. This is, of course, simple PQL, so you may also use other operators instead of "datastream". If the table "main" does not exists, it is created before. Furthermore, notice, that the schema of the table main and the schema of the incoming "datastream" must be union compatible.

Since each tuple is just appended to an existing table, you can also drop the table when the query is started:

Code Block
result = DATABASESINK({connection='con1', table='main', drop='true'}, datastream)

This makes it possible to create a new table with the according schema. Alternatively, you may just truncate (remove all entries but keep the table and its schema) the table:

Code Block
result = DATABASESINK({connection='con1', table='main', truncate='true'}, datastream)

If the schema of the target table does not match the inputs of the operator, you can provide a prepared statement with the attribute names of the table.

Code Block
result = DATABASESINK({connection='con1', table='main', preparedStatement='INSERT INTO main (Att1, Att2, Att3) values(?,?,?)'}, datastream)

See https://docs.oracle.com/javase/7/docs/api/java/sql/PreparedStatement.html for other examples how to use the prepated statement. The order of the attributes is just the order of the schema in the input of the operator. So it would easily be possible to update values as in the following example. In this case the first input attribute must be the new salery and the second attribute the id (you can use Project operator or Map operator to reorganise you input)

Code Block
result = DATABASESINK({connection='con1', table='EMPLOYEES', preparedStatement='UPDATE EMPLOYEES SET SALARY = ? WHERE ID = ?'}, datastream)

Datatype Mapping

This part introduces the concept of datatype mapping between the relational database system and Odysseus. Since you have to define a union-compatible schema between your database system and Odysseus (e.g. if you want to read a table without fetching the schema), the different definitions of datatypes may be confusing so that I give a short introduction.

Odysseus has different Datatypes (see Data Types for details) like Integer or String while e.g. a MySQL uses int and varchar for the same types. Furthermore, there is the JDBC driver between the database, which also uses other datatypes. So, Odysseus has a mapping that maps all (default) datatypes (called SDFDatatype) to appropriate datatypes from JDBC (see java.sql.Types). The following table shows how Odysseus maps a database data type to a Odysseus datatype:

JDBC TypesSDFDatatypes
ArrayObject
BigIntLong
Binary

Long

BitBoolean
BlobObject
BooleanBoolean
CharString
ClobObject
DatalinkObject
DateDate
DecimalInteger
DistinctObject
DoubleDouble
FloatFloat
IntegerInteger
Java ObjectObject
Long NVarCharString
Long VarBinaryLong
Long VarCharString
NCharString
NClobObject
NullObject
NumericDouble
NVarCharString
OtherObject
RealFloat
RefObject
RowIdObject
SmallIntInteger
SQLXMLString
StructObject
TimeTimestamp
TimestampTimestamp
TinyIntInteger
VarBinaryLong
VarCharString


you may see, for example, that a TINYINT of JDBC is mapped to an integer in Odysseus. Therefore, if the DATABASESOURCE reads a table with datatypes BIGINT and VARCHAR, therse datatypes are mapped to LONG an STRING in Odysseus. Since this mapping is used when reading from the database, there is also a direction how Odysseus transforms its datatypes to JDBC-datatypes for writing data:

SDFDatatypesJDBC Types
BooleanBoolean
ByteBinary
DateDate
DoubleDouble
End TimestampBigInt
FloatFloat
IntegerInteger
LongBigInt
ObjectObject
Point in TimeBigInt
StringVarChar
TimestampTimestamp
StartTimestampBigInt
EndTimestampBigInt


For example, if the DATABASESINK operator creates a new table to insert data, it uses, e.g. Types.BIGINT for a LONG or any timestamp.

Although most of the usual datatypes are comprehensible mapped, you could look up how the specific JDBC-Driver maps the DBMS specific datatypes to a JDBC datatype. For Oracle for example, you may look here: http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/table8.7.html 

Supported databases

The following databases are supported:

  • MariaDB
  • MySQL
  • PostgreSQL
  • Derby
  • Oracle

CQL has some problems at the moment, so please use PQL to create DB connections.

Open and drop persistent database connections

...

Open a connection using preconfigured drivers

2017-11-22: Important: The database feature does not contain the cql parts anymore. If you want to use this, you will ne to install the "database.cql" feature after the "database" feature! 

There are different drivers for different DBMS (at the moment for MySQL, Postgres and Oraclesee the list of supported databases) that can be used with a more confortable syntax. For example, the following query creates a connection with the name con1 to a MySQL server on localhost (this is an implicit setting of the driver) with a database called test:

...

Code Block
CREATE DATABASE CONNECTION con1b AS oracle TO test
CREATE DATABASE CONNECTION con1c AS postgresql TO test

Note for oracle: the last token ("test" in this example) is the SID.

You may also define the host and/or the port you want to connect to:

...

Code Block
DROP DATABASE CONNECTION conname


Using a connection in

...

StreamSQL (CQL)

Reading from a database

To read from a database

...

with StreamSQL, you first have to create a stream, which is similar to other CREATE-STREAM queries:

Code Block
CREATE STREAM datastream(id INTEGER, value STRING) DATABASE con1 TABLE main

This uses connection con1 to open a connection to table main and reads its data. The given schema (in this case id integer and value string) must be compatible with table main. This means that the table main should have the attributes id and value, which should have suitable datatypes.

Like in PQL, reading from a database is done as fast as possible. So, if you want to slow down this, you can use the "each" parameter so that the reading waits a certain time between two reads. So if If you want to read from a database, you may use the DATABASESOURCE operator. For an existing connection called "con1" we can read from the table called "main":

Code Block
datastream := DATABASESOURCE({connection='con1', table='main'})

To reuse

Using a connection in StreamSQL (CQL)

 

 

a tuple each second, you can use the following syntax:

Code Block
CREATE STREAM datastream(id INTEGER, value STRING) DATABASE con1 TABLE main EACH 1 SECOND

The time specification (in this example 1 SECOND) is equal to the specification of time windows, so you may also use other time intervall like "200 MILLISECONDS", "10 SECONDS", "5 MINUTES" or even "1 WEEK"

After creating the stream you can access the stream as usual, e.g.:

Code Block
SELECT * FROM datastream WHERE...
Writing to a database

For writing into a database, you have to create a sink (according to create a source via "create stream")

Code Block
CREATE SINK dbsink AS DATABASE con1 TABLE example

This query uses connection con1 to create a sink that writes data into the table example and is called dbsink. If the table example does not exist, it will be created. However, if there is already a table called "example", you can drop the table, so that a new table is created each time:

Code Block
CREATE SINK dbsink AS DATABASE con1 TABLE example AND DROP

Alternatively, you may just truncate (remove all entries but keep the table and its schema) the table:

Code Block
CREATE SINK dbsink AS DATABASE con1 TABLE example AND TRUNCATE

Now, you can use the stream-to statement to write the outgoing stream of an usual select statement into the sink:

Code Block
STREAM TO dbsink SELECT * FROM samplestream

Remember the union compatibility, because the outgoing schema of "SELECT * FROM samplestream" that is written into "dbsink" must be union-compatible with the table of "dbsink", which is the schema of example in this case.