Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Remember the special behevior, if you reuse an existing operator. If there is already an operator that reads from the database, a second operator would join this reading - and not beginning from the start of the table. 

Since the reading from the database is done as fast as possible, you possibly want to slow down this. An optional paramter indicates the number of milliseconds that the operator should wait between each tuple that is read. For example, for reading each second (=1000 milliseconds):

Code Block
datastream := DATABASESOURCE({connection='con1', table='main', waiteach=1000})
Writing to a database

You can write a stream into a database by using the DATABASESINK operator. So, if you have for example a stream or an operator that is named "datastream", the DATABASESINK can be used as follows:

...

This makes it possible to create a new table with the according schema. Alternatively, you may just truncate (remove all entries but keep the table and its schema) the table:

 

Code Block
result = DATABASESINK({connection='con1', table='main', truncate='true'}, datastream)

...

Using a connection in StreamSQL (CQL)

Reading from a database

To read from a database with StreamSQL, you first have to create a stream, which is similar to other CREATE-STREAM queries:

Code Block
CREATE STREAM datastream(id INTEGER, value STRING) DATABASE con1 TABLE main

This uses connection con1 to open a connection to table main and reads its data. The given schema (in this case id integer and value string) must be compatible with table main. This means that the table main should have the attributes id and value, which should have suitable datatypes.

Like in PQL, reading from a database is done as fast as possible. So, if you want to slow down this, you can use the "each" parameter so that the reading waits a certain time between two reads. So if you want to read a tuple each second, you can use the following syntax:

Code Block
CREATE STREAM datastream(id INTEGER, value STRING) DATABASE con1 TABLE main EACH 1 SECOND

The time specification (in this example 1 SECOND) is equal to the specification of time windows, so you may also use other time intervall like "200 MILLISECONDS", "10 SECONDS", "5 MINUTES" or even "1 WEEK"

After creating the stream you can access the stream as usual, e.g.:

Code Block
SELECT * FROM datastream WHERE...
Writing to a database

For writing into a database, you have to create a sink (according to create a source via "create stream")

Code Block
CREATE SINK dbsink AS DATABASE con1 TABLE example

This query uses connection con1 to create a sink that writes data into the table example and is called dbsink. If the table example does not exist, it will be created. However, if there is already a table called "example", you can drop the table, so that a new table is created each time:

Code Block
CREATE SINK dbsink AS DATABASE con1 TABLE example AND DROP

Alternatively, you may just truncate (remove all entries but keep the table and its schema) the table:

Code Block
CREATE SINK dbsink AS DATABASE con1 TABLE example AND TRUNCATE

Now, you can use the stream-to statement to write the outgoing stream of an usual select statement into the sink:

Code Block
STREAM TO dbsink SELECT * FROM samplestream

Remember the union compatibility, because the outgoing schema of "SELECT * FROM samplestream" that is written into "dbsink" must be union-compatible with the table of "dbsink", which is the schema of example in this case.