Remark (2017.11.30): CQL is no longer part of the default download. If you want to use cql, it must be installed. See How to install new features. A comfortable way is to use  "#REQUIRED  de.uniol.inf.is.odysseus.parser.cql2.feature.feature.group" (see Features and Updates)

This document describes the basic concepts of the Continuous Query Language (CQL) of Odysseus and shows how to use the language.

The Continuous Query Language (CQL) is a SQL based declarative query language. This document shows how to formulate queries with CQL.

Create Streams

The create stream statement is used to tell Odysseus where the data comes from, this normally opens a connection to a source, e.g. a sensor or server.

The stream always consists of a name (here: "category") and a schema:

CREATE STREAM category (id INTEGER, name STRING, description STRING, parentid INTEGER) ....

Then, it is followed by a connection-property that tells how/where the stream can be accessed. Most used are the channel format and the generic access framework (which we recommend)

Odysseus Channel Format

Odysseus has a built-in byte-based format for transfering data. This is, for example, used by the nexmark example. This is called a "CHANNEL"-connection and looks like follows:

CREATE STREAM nexmark:person (timestamp STARTTIMESTAMP,id INTEGER,name STRING,email STRING,creditcard STRING,city STRING,state STRING) CHANNEL localhost : 65440

Generic Access Framework

However, the recommended and new way is a generic access, which offers different protocols, wrappers etc. as described in Access framework. An example would be:


CREATE STREAM nexmark:person (timestamp STARTTIMESTAMP, id INTEGER, name STRING, email STRING, creditcard STRING, city STRING, state STRING)
    WRAPPER 'GenericPush' 
    PROTOCOL 'SizeByteBuffer'
    TRANSPORT 'NonBlockingTcp'
    DATAHANDLER 'Tuple'
    OPTIONS ( 'port' '65440', 'host' 'odysseus.offis.uni-oldenburg.de', 'ByteOrder' 'Little_Endian')

As you may see, there is a direct mapping between the needed parameters. So you can use each Protocol Handler and Data handler and Transport Handler in a CREATE STREAM statement. Thus, the wrapper must be also existing, which are e.g. GenericPush or GenericPull (see also Access framework). The Options-parameter is optional and is a comma separated list of key value pairs that are enclosed by quotation marks.

Create Views

You can also create a view, which is a logical view on a result of a continuous query.

CREATE VIEW nexQuery FROM (
    SELECT b.auction, DolToEur(b.price) AS euroPrice, b.bidder, b.datetime FROM nexmark:bid [UNBOUNDED] AS b
)

This allows you to resuse the query, e.g. as follows:

SELECT * FROM nexQuery 

Create Sinks

Similar to creating sources for incoming data by "create stream", you can also create sinks for outgoing data. The notation is very similar to "create stream". Since it is also based on the Access Framework, you can also need different Protocol Handler and Data handler and Transport Handler. For example, the following creates a sink that writes a CSV file:

CREATE SINK writeout (timestamp STARTTIMESTAMP, auction INTEGER, bidder INTEGER, datetime LONG,    price DOUBLE)
    WRAPPER 'GenericPush'
    PROTOCOL 'CSV'
    TRANSPORT 'File'
    DATAHANDLER 'Tuple'
    OPTIONS ( 'filename' 'E:\test')


Drop Streams

You can drop a stream with:

DROP STREAM category

Since this statement would return an error if the stream "category" does not exist, you can add "IF EXISTS" to avoid this error (it checks, if the stream is existing before running the drop)

DROP STREAM category IF EXISTS

Drop Sinks

You can drop a sink with:

DROP SINK category

Since this statement would return an error if the stream "category" does not exist, you can add "IF EXISTS" to avoid this error (it checks, if the sink is existing before running the drop)

DROP SINK category IF EXISTS

Continuous Query

In summary, a CQL statement is like a SQL one, so the continuous query consists of a select, a from, a where, a goup and a having part.

We use the following example to explain basic details of CQL-Query.

ATTENTION: Currently, the * notation is not allowed for aggregation functions. So instead of count(*) use count(attribute). The parser error is not very helpful in this case: "Caused by: de.uniol.inf.is.odysseus.parser.cql.parser.ParseException: Encountered " "SELECT" "SELECT "" at line 1, column 1. Was expecting:   "REVOKE" ..."

SELECT auction, AVG(price) AS aprice 
FROM bid [SIZE 60 MINUTES ADVANCE 1 MINUTE TIME]
WHERE auction > 10 
GROUP BY auction 
HAVING aprice<100.0

Select

SELECT auction, AVG(price) AS aprice...

From

... FROM bid [SIZE 60 MINUTES ADVANCE 1 MINUTE TIME]... 

The most different parts between usual SQL and CQL is the FROM part, because you have the possibility to definie windows. CQL defines them by squared brackets.

The following parameters are available for time based windows (TIME):

  • SIZE: Defines the size of the window, e.g. 60 MINUTES
  • ADVANCE: After what time will the window move

The following parameters are available for element based windows (TUPLE)

  • SIZE: Defines the size of the window in elements
  • ADVANCE: After how many elements is the window moved

Futher information about windows can be found here.

More about the window syntax can be found at Select syntax.

Where

... WHERE auction > 10 ...

Group By and Having

... GROUP BY auction 
HAVING aprice<100.0

Stream To

If you want to stream your results into a sink, you first have to create a sink.

STREAM TO writeout SELECT * FROM nexmark:person WHERE...

This example would push all data that is produced by "SELECT * FROM nexmark:person WHERE..." into the sink named writeout, which is a file-writer in our case (see above).

Examples

Here are some language examples what can be used in the select-part of a CQL-Statement

#PARSER CQL
#TRANSCFG Standard
#DOREWRITE false
#QUERY
DROP STREAM bid IF EXISTS
#QUERY
ATTACH STREAM bid (timestamp STARTTIMESTAMP, auction INTEGER, bidder INTEGER, datetime LONG, price DOUBLE) CHANNEL localhost : 65442
#QUERY
DROP STREAM person IF EXISTS
#QUERY
ATTACH STREAM person (timestamp STARTTIMESTAMP,id INTEGER,name STRING,email STRING,creditcard STRING,city STRING,state STRING) CHANNEL localhost : 65440
/// SIMPLE PROJECTS
#QUERY 
SELECT * FROM bid
#QUERY
SELECT bid.* FROM bid
#QUERY
SELECT price FROM bid
#QUERY
SELECT bidder, price FROM bid
#QUERY
SELECT timestamp, auction, bidder, datetime, price FROM bid
/// PROJECTS WITH RENAMED SOURCE BUT NO USE IN PROJECT
#QUERY 
SELECT * FROM bid AS b
#QUERY
SELECT bid.* FROM bid AS b
#QUERY
SELECT price FROM bid AS b
#QUERY
SELECT bidder, price FROM bid AS b
#QUERY
SELECT timestamp, auction, bidder, datetime, price FROM bid AS b
/// PROJECTS WITH RENAMED SOURCE WITH USE IN PROJECT
#QUERY 
SELECT * FROM bid AS b
#QUERY
SELECT b.* FROM bid AS b
#QUERY
SELECT b.price FROM bid AS b
#QUERY
SELECT b.bidder, b.price FROM bid AS b
#QUERY
SELECT b.timestamp, b.auction, b.bidder, b.datetime, b.price FROM bid AS b
/// PROJECTS WITH RENAMED ATTRIBUTES
#QUERY
SELECT price AS p FROM bid
#QUERY
SELECT price AS p, bidder FROM bid
#QUERY
SELECT price AS p, bidder AS b FROM bid
/// PROJECTS WITH RENAMED ATTRIBUTES AND SOURCES
#QUERY
SELECT price AS p FROM bid AS b
#QUERY
SELECT b.price AS p FROM bid AS b
#QUERY
SELECT b.price AS p, b.bidder FROM bid AS b
#QUERY
SELECT b.price AS p, b.bidder AS b FROM bid AS b
/// PROJECTS WITH CONSTANTS, FUNCTIONS AND EXPRESSIONS
#QUERY
SELECT bidder + price AS d FROM bid
#QUERY
SELECT 123.4 * price AS d FROM bid
#QUERY
SELECT 123.4 AS d FROM bid
#QUERY
SELECT 123.4 AS d, price FROM bid
#QUERY
SELECT DolToEur(price) AS d FROM bid
#QUERY
SELECT DolToEur(price) AS d, price FROM bid
#QUERY
SELECT DolToEur(price) * auction AS d FROM bid
#QUERY
SELECT DolToEur(price) * price AS d FROM bid
#QUERY
SELECT DolToEur(price) * auction AS d, price FROM bid
#QUERY
SELECT DolToEur(123.4) AS d FROM bid
#QUERY
SELECT 'test' AS s FROM bid
#QUERY
SELECT 'test' AS s, 123.4 AS d FROM bid
#QUERY
SELECT 'test' AS s, 123.4 AS d, price FROM bid
/// PROJECTS AND SELECTS WITH RENAMED SOURCE WITH USE IN SELECT
#QUERY 
SELECT * FROM bid AS b WHERE b.bidder > 10
#QUERY
SELECT b.* FROM bid AS b WHERE b.bidder > 10
#QUERY
SELECT b.price FROM bid AS b WHERE b.price < 150.0
#QUERY
SELECT b.bidder, b.price FROM bid AS b WHERE b.bidder =  1
#QUERY
SELECT b.timestamp, b.auction, b.bidder, b.datetime, b.price FROM bid AS b WHERE b.price > 100.0
/// PROJECTS WITH RENAMED ATTRIBUTES
#QUERY
SELECT price AS p FROM bid WHERE p < 100
#QUERY
SELECT price AS p, bidder FROM bid WHERE p > 100
#QUERY
SELECT price AS p, bidder AS b FROM bid WHERE b=1 AND p <100
/// AGGREGATES ARE NOT HANDLED LIKE FUNCTIONS AND MAY HAVE A GROUPING ETC.
#QUERY
SELECT AVG(price) AS aprice FROM bid
#QUERY
SELECT AVG(price) AS aprice FROM bid GROUP BY auction
#QUERY
SELECT auction, AVG(price) AS aprice FROM bid GROUP BY auction
#QUERY
SELECT auction, AVG(price) AS aprice FROM bid GROUP BY auction HAVING aprice<100.0
/// JOINS AND SELFJOINS
#QUERY
SELECT auction, bidder FROM bid, person WHERE bid.bidder = person.id
#QUERY
SELECT auction FROM bid AS b, person AS p WHERE b.bidder = p.id
#QUERY
SELECT auction AS a, bidder AS b FROM bid, person WHERE a = b
#QUERY
SELECT auction, bidder FROM bid, person WHERE bid.bidder = person.id
#QUERY
SELECT left.* FROM bid AS left, bid AS right WHERE left.bidder = right.bidder
#QUERY
SELECT a.auction AS aid FROM bid AS a, bid AS b WHERE aid=b.auction

And here are examples, based on nexmark, that are more complex

#PARSER CQL
#TRANSCFG Standard
#DROPALLQUERIES
#DROPALLSOURCES
#QUERY
CREATE STREAM nexmark:person (timestamp STARTTIMESTAMP, id INTEGER, name STRING, email STRING, creditcard STRING, city STRING, state STRING) CHANNEL localhost : 65440
#QUERY
CREATE STREAM nexmark:bid (timestamp STARTTIMESTAMP, auction INTEGER, bidder INTEGER, datetime LONG, price DOUBLE) CHANNEL localhost : 65442
#QUERY
CREATE STREAM nexmark:auction (timestamp STARTTIMESTAMP, id INTEGER, itemname STRING, description STRING, initialbid INTEGER, reserve INTEGER, expires LONG, seller INTEGER, category INTEGER) CHANNEL localhost : 65441
#QUERY
CREATE STREAM nexmark:category (id INTEGER, name STRING, description STRING, parentid INTEGER) CHANNEL localhost : 65443

#PARSER CQL
#TRANSCFG Standard
#DROPALLQUERIES
/// Query 1: Currency Conversion
#QNAME Nexmark:Q1
#ADDQUERY
SELECT auction, DolToEur(price) AS euro, bidder, datetime 
FROM nexmark:bid [UNBOUNDED];

///Query 2: Selection
#QNAME Nexmark:Q2
#ADDQUERY
SELECT auction, price 
FROM nexmark:bid 
WHERE auction=7 OR auction=20 OR auction=21 OR auction=59 OR auction=87;

///Query 3: Local Item Suggestion
#QNAME Nexmark:Q3
#ADDQUERY
SELECT p.name, p.city, p.state, a.id 
FROM nexmark:auction [UNBOUNDED] AS a, nexmark:person [UNBOUNDED] AS p 
WHERE a.seller=p.id AND (p.state='Oregon' OR p.state='Idaho' OR p.state='California') AND a.category = 10;


///Query 4: Average Price for a Category
#QNAME Nexmark:Q4
#ADDQUERY
SELECT AVG(q.final) 
FROM nexmark:category [UNBOUNDED] AS c, 
    (SELECT MAX(b.price) AS final, a.category 
     FROM nexmark:auction [UNBOUNDED] AS a, nexmark:bid [UNBOUNDED] AS b  
     WHERE a.id = b.auction AND b.datetime < a.expires AND  a.expires < Now() 
     GROUP BY a.id, a.category) AS q 
WHERE q.category = c.id 
GROUP BY c.id;

///Query 5: Hot Items
#QNAME Nexmark:Q5
#ADDQUERY
SELECT b2.auction 
FROM   (SELECT b1.auction, COUNT(auction) AS num
        FROM nexmark:bid [SIZE 60 MINUTES ADVANCE 1 MINUTE TIME] AS b1
        GROUP BY b1.auction
        ) AS b2
WHERE num >= ALL   (SELECT count(auction) AS c 
                    FROM nexmark:bid [SIZE 60 MINUTES ADVANCE 1 MINUTE TIME] AS b2
                    GROUP bY b2.auction)
        
///Query 6: Average Selling Price by Seller
#QNAME Nexmark:Q6
#ADDQUERY
SELECT AVG(Q.final) AS s, Q.seller
FROM (
SELECT MAX(B.price) AS final, A.seller
      FROM nexmark:auction [UNBOUNDED] AS A , nexmark:bid [UNBOUNDED]  AS B 
      WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < ${NOW}
      GROUP BY A.id, A.seller) [SIZE 10 TUPLE PARTITION BY A.seller] AS Q
GROUP BY Q.seller;

///Query 7: Monitor New Users
#QNAME Nexmark:Q7
#ADDQUERY
SELECT p.id, p.name, a.reserve 
FROM nexmark:person [SIZE 12 HOURS ADVANCE 1 TIME] AS p, nexmark:auction [SIZE 12 HOURS ADVANCE 1 TIME] AS a 
WHERE p.id = a.seller;