Here is described, how to replicate operators in OdysseusNet. It is slightly different than the replication of the peer feature in Replication

You can replicate all operators, which can be replicated, beginning upwards a operator in the plan (along the dataflow).

To do so, you have to specify the operatorid in the modification options: 

#NODE_MODIFICATION REPLICATION [OPERATOR_ID] <REPLICATION_DEGREE>

And specify the id of a operator: 

OUTPUT_STREAM = OPERATOR({OPTIONS, ID='OPERATOR_ID'},INPUT_STREAM)


E. g. the following query only replicates the JOIN-Operator:


#PARSER PQL
#CONFIG DISTRIBUTE true

#NODE_PARTITION OPERATORCLOUD           /// Select OperatorCloud strategy for Partition phase
#NODE_MODIFICATION REPLICATION window2 2        /// Select Replication strategy for Modification phase (with replication degree 2 and operator window2 to begin (exclusive) replication with)
#NODE_ALLOCATION QUERYCOUNT                   /// Select User-strategy for Allocation phase
#NODE_PREPROCESSOR APPEND_KAFKA
///#WORKER_TARGET_PARAMETER (host=localhost:29092)
#WORKER_TARGET_PARAMETER (host=broker:9092)


#RUNQUERY
nexmark:auction = ACCESS({source='nexmark:auction',
    wrapper='GenericPush',
    transport='TCPClient',
    protocol='SizeByteBuffer',
    dataHandler='Tuple',
    options=[
        ['host', 'nexmark'],
        ['port', '65441'],
        ['ByteOrder', 'LittleEndian']
        ],
    schema=[
        ['timestamp', 'STARTTIMESTAMP'],
        ['id', 'INTEGER'],
        ['itemname', 'STRING'],
        ['description', 'STRING'],
        ['initialbid', 'INTEGER'],
        ['reserve', 'INTEGER'],
        ['expires', 'LONG'],
        ['seller', 'INTEGER'],
        ['category', 'INTEGER']
        ]
    })

nexmark:bid = ACCESS({source='nexmark:bid',
    wrapper='GenericPush',
    transport='TCPClient',
    protocol='SizeByteBuffer',
    dataHandler='Tuple',
    options=[
        ['host', 'nexmark'],
        ['port', '65442'],
        ['ByteOrder', 'LittleEndian']
        ],
    schema=[
        ['nexmark:bid','timestamp', 'STARTTIMESTAMP'],
        ['nexmark:bid','auction', 'INTEGER'],
        ['nexmark:bid','bidder', 'INTEGER'],
        ['nexmark:bid','datetime', 'LONG'],
        ['nexmark:bid','price', 'DOUBLE']
        ]
    })

windowed_auction = TIMEWINDOW({SIZE=10000, SLIDE=10000},nexmark:auction)
windowed_bid = TIMEWINDOW({SIZE=10000, SLIDE=10000, ID='window2'},nexmark:bid)

out = JOIN({
          PREDICATE='id = auction'
        },
        windowed_auction,
        windowed_bid
      )
  • No labels