RuleEngine is a coordinator for OAM programs and the external world.
RuleEngine is built on top of Duct Framework and uses extremely Integrant configuration system.
Components of engine are modular and decopled from each other. Let's look at current pipeline of message handling:
kafka-consumer -> decoder -> crm -> oam -> coeffects -> persistence
This pipeline is assembled on configuration stage, each step change/extend message or executes side effects and call next one. Because configuration is just data we can build tools to assemble systems, even programmatically (web UI?), including set of supported coeffects. We can think about it as "Building an Environment" for OAM programs.
Running systems don't have any global state and it's possible to run different systems inside single JVM instance. It's useful at least for development (run tests and dev versions at the same time from the REPL), but maybe in the future, we'll find more cases.
All computations are executed inside kafka-consumer thread pool. This is simple approach and fits well Kafka messaging platform, but maybe not the best from the throughput perspective.
:re/oamprovides :run and :unblock functions to execute OAM programs:re/kafka-consumerconsumes kafka topics and callnexton each message:re.stage/decoderparses RoutableMessages, replaces :value of the message and callnext:re.stage/crmcontains business logic, it manages entities, knows which programs should be run, unblocks corresponding programs:re.stage/rpcreceives RPC responses for particular states and unblock them:re.stage/coeffectsfor each coeffect in the message it runs corresponding coeffect's handlerre.stage/persistenceit saves state to cassandra if the program still running or delete the record if notre.coeffect/httpeventcoeffect handler, sends event to kafka topic usingencoderre.coeffect/timer-servicetimercoeffect handler, manages local timeout service (without persistence) and sends TimerExpired events tokafka:re/encoderprovides a function to encode the message as protobuf
:core/now"current" time:core/binary-message:core/message:core/state-id:core/state:core/state-meta:core/program:core/effectssets of effects to be executed by :leave of stage.effects:oam/values:oam/coeffects:oam/state:oam/bc:oam/coeffect-id:oam/coeffect-value:crm/entity:crm/campaign:kafka/key:kafka/offset:kafka/partition:kafka/topic
git clone https://github.com/xray-tech/xorc-xray.git --recursive- Setup docker authentication as described in the Infra README
To execute tests:
docker-compose run re lein testTo execute tests continuously:
docker-compose run re lein test-refresh# If the compiler was modified
docker-compose build
# If a new Kotlin OAM jar was published
cd re; lein -U deps
# run
docker-compose -f docker-compose.ymlWhile it's still possible to have "normal" development via cider-jack-in (or analogs), to have all required external dependencies (protobuf, kafka, scylladb) just in place you can start headless REPL with docker-compose and connect to it with your editor:
docker-compose -f docker-compose.yml -f docker-compose.repl.yml upThe main drawback of this approach, that goto-* functionality doesn't work. I currently don't have an easy solution for this.
dev namespace contains useful functions to play with system, like add-program and send-event
Connect to the nREPL via your IDE or command line and start and reset the dev namespace
lein repl :connect localhost:4011
(dev) (reset)Load a compiled orc program
This program (source here) listens to a checkout event and prints the number of occurences for each entity.
To add it to the dev universe
;; Add "re/oam/subscription.oam" program to dev universe
;; See re/docs/programs/subscription.orc for source code
(add-program "dev" "re/oam/subscription.oam")and start the program for each entity in that universe
;; It starts all active programs in dev universe.
(send-event "dev" "events.SDKEvent" {"header" {"type" "events.SDKEvent"
"recipientId" "dev-entity"
"createdAt" 0
"source" "repl"}
"event" {"name" "dev-event"}
"environment" {"appId" "dev"}})Send events
You can then send an event using send-event and specify the recipientId for which entity you want the program to run.
;; subscription.oam subscribes to {events\.SDKEvent}{checkout} events,
;; any other events will be ignored
(send-event "dev" "events.SDKEvent" {"header" {"type" "events.SDKEvent"
"recipientId" "dev-entity"
"createdAt" 0
"source" "repl"}
"event" {"name" "checkout"
"properties" [{"key" "sum"
"numberValue" 1000}]}
"environment" {"appId" "dev"}})In the docker logs, you'll see the output each time you send the event for a given entity
re_1 | 09:32:47.398 INFO [] re.stage.log-oam-values - Value iteration: 4; sum: 1000.0
Run Orc directly
You can POST a orc source direclty to 'http://localhost:4012/run to run it
curl -i -X POST -H "Content-Type:application/orc" -d '1 | 2' 'http://localhost:4012/run'
{"values":[1,2]}or using a file with a program that will run:
curl -i -X POST -H "Content-Type:application/orc" --data-binary @re/docs/programs/bench.orc 'http://localhost:4012/run'
{"values":[0],"state":"876e8ecd-231a-4d21-a9fd-df01d4e0d8ee"}Note the state id
The Jeager UI is accessible on http://localhost:16686/
You can filter the traces using the state id of your program (see above) by using the tag filter
re.state=876e8ecd-231a-4d21-a9fd-df01d4e0d8ee
To use a local version of JVM OAM install it to a local maven repository (./gradlew :runtime:install) and restart xray