...
Remember the special behevior, if you reuse an existing operator. If there is already an operator that reads from the database, a second operator would join this reading - and not beginning from the start of the table.
Since the reading from the database is done as fast as possible, you possibly want to slow down this. An optional paramter indicates the number of milliseconds that the operator should wait between each tuple that is read. For example, for reading each second (=1000 milliseconds):
Code Block |
---|
datastream := DATABASESOURCE({connection='con1', table='main', waiteach=1000}) |
Writing to a database
You can write a stream into a database by using the DATABASESINK operator. So, if you have for example a stream or an operator that is named "datastream", the DATABASESINK can be used as follows:
...
This makes it possible to create a new table with the according schema. Alternatively, you may just truncate (remove all entries but keep the table and its schema) the table:
Code Block |
---|
result = DATABASESINK({connection='con1', table='main', truncate='true'}, datastream) |
...
Using a connection in StreamSQL (CQL)
Reading from a database
To read from a database with StreamSQL, you first have to create a stream, which is similar to other CREATE-STREAM queries:
Code Block |
---|
CREATE STREAM datastream(id INTEGER, value STRING) DATABASE con1 TABLE main |
This uses connection con1 to open a connection to table main and reads its data. The given schema (in this case id integer and value string) must be compatible with table main. This means that the table main should have the attributes id and value, which should have suitable datatypes.
Like in PQL, reading from a database is done as fast as possible. So, if you want to slow down this, you can use the "each" parameter so that the reading waits a certain time between two reads. So if you want to read a tuple each second, you can use the following syntax:
Code Block |
---|
CREATE STREAM datastream(id INTEGER, value STRING) DATABASE con1 TABLE main EACH 1 SECOND |
The time specification (in this example 1 SECOND) is equal to the specification of time windows, so you may also use other time intervall like "200 MILLISECONDS", "10 SECONDS", "5 MINUTES" or even "1 WEEK"
After creating the stream you can access the stream as usual, e.g.:
Code Block |
---|
SELECT * FROM datastream WHERE... |
Writing to a database
For writing into a database, you have to create a sink (according to create a source via "create stream")
Code Block |
---|
CREATE SINK dbsink AS DATABASE con1 TABLE example |
This query uses connection con1 to create a sink that writes data into the table example and is called dbsink. If the table example does not exist, it will be created. However, if there is already a table called "example", you can drop the table, so that a new table is created each time:
Code Block |
---|
CREATE SINK dbsink AS DATABASE con1 TABLE example AND DROP |
Alternatively, you may just truncate (remove all entries but keep the table and its schema) the table:
Code Block |
---|
CREATE SINK dbsink AS DATABASE con1 TABLE example AND TRUNCATE |
Now, you can use the stream-to statement to write the outgoing stream of an usual select statement into the sink:
Code Block |
---|
STREAM TO dbsink SELECT * FROM samplestream |
Remember the union compatibility, because the outgoing schema of "SELECT * FROM samplestream" that is written into "dbsink" must be union-compatible with the table of "dbsink", which is the schema of example in this case.