mirror of
https://github.com/RoganDawes/P4wnP1_aloa.git
synced 2025-03-18 13:52:04 +01:00
235 lines
6.2 KiB
Go
235 lines
6.2 KiB
Go
package hid
|
|
|
|
import (
|
|
"github.com/robertkrimen/otto"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"log"
|
|
"context"
|
|
"encoding/json"
|
|
)
|
|
|
|
var vmNum = 0
|
|
var jobNum = 0
|
|
|
|
|
|
type AsyncOttoJob struct {
|
|
ctx context.Context
|
|
Cancel context.CancelFunc
|
|
executingVM *AsyncOttoVM
|
|
Id int
|
|
Source interface{}
|
|
finishedNotify chan struct{}
|
|
isFinished bool //finishedNotify could only be used to signal finish (blocking wait for chanel close), not for non blocking polling, thus we add a state var
|
|
ResultErr error
|
|
ResultValue otto.Value
|
|
}
|
|
|
|
func (job *AsyncOttoJob) Result() interface{} {
|
|
goRes,_ := job.ResultValue.Export() //error is always nil (otto doc)
|
|
return goRes
|
|
}
|
|
|
|
func (job *AsyncOttoJob) SetFinished() {
|
|
job.executingVM = nil //avoid accessing the vm from this job again
|
|
job.isFinished = true
|
|
// we call the cancelFunc, this would only issue an interrupt to the vm, in case the VM would still be set executingVM, but assures ctx.Done listener could react
|
|
// to the finished Job
|
|
job.Cancel()
|
|
close(job.finishedNotify)
|
|
}
|
|
|
|
func (job *AsyncOttoJob) WaitFinished() {
|
|
<-job.finishedNotify
|
|
}
|
|
|
|
func (job *AsyncOttoJob) GetVMId() (vmID int, err error) {
|
|
if vm:=job.executingVM; vm == nil {
|
|
return -1, errors.New("Not assigned to VM")
|
|
} else {
|
|
return vm.Id, nil
|
|
}
|
|
|
|
}
|
|
|
|
|
|
func (job *AsyncOttoJob) WaitResult() (otto.Value, error) {
|
|
job.WaitFinished()
|
|
return job.ResultValue, job.ResultErr
|
|
}
|
|
|
|
|
|
func (job *AsyncOttoJob) ResultJsonString() (string, error) {
|
|
goRes := job.Result() //error is always nil (otto doc)
|
|
json, err := json.Marshal(goRes)
|
|
if err != nil {return "",err}
|
|
return string(json),nil
|
|
}
|
|
|
|
type AsyncOttoVM struct {
|
|
vm *otto.Otto
|
|
isWorking bool
|
|
sync.Mutex
|
|
Id int
|
|
eventHandler EventHandler
|
|
}
|
|
|
|
func (avm *AsyncOttoVM) HandleEvent(event Event) {
|
|
fmt.Printf("!!! AsyncOtto Event: %+v\n", event)
|
|
}
|
|
|
|
func (avm *AsyncOttoVM) SetEventHandler(handler EventHandler) {
|
|
avm.eventHandler = handler
|
|
}
|
|
|
|
func (avm *AsyncOttoVM) SetDefaultHandler() {
|
|
avm.SetEventHandler(avm)
|
|
}
|
|
|
|
func (avm *AsyncOttoVM) emitEvent(event Event) {
|
|
if avm.eventHandler == nil { return }
|
|
avm.eventHandler.HandleEvent(event)
|
|
}
|
|
|
|
func NewAsyncOttoVM(vm *otto.Otto) *AsyncOttoVM {
|
|
vm.Interrupt = make(chan func(), 1) //set Interrupt channel
|
|
res := &AsyncOttoVM{
|
|
isWorking: false,
|
|
vm: vm,
|
|
Id: vmNum,
|
|
}
|
|
res.SetDefaultHandler()
|
|
vmNum++
|
|
return res
|
|
}
|
|
|
|
func NewAsyncOttoVMClone(vm *otto.Otto) *AsyncOttoVM {
|
|
return NewAsyncOttoVM(vm.Copy())
|
|
}
|
|
|
|
func (avm AsyncOttoVM) IsWorking() bool {
|
|
avm.Lock()
|
|
res := avm.isWorking
|
|
avm.Unlock()
|
|
return res
|
|
}
|
|
|
|
func (avm *AsyncOttoVM) SetWorking(working bool) {
|
|
avm.Lock()
|
|
avm.isWorking = working
|
|
avm.Unlock()
|
|
fmt.Printf("VM %d set to isWorking: %v\n", avm.Id, working)
|
|
return
|
|
}
|
|
|
|
func (avm *AsyncOttoVM) RunAsync(ctx context.Context, src interface{}, anonymousSelfInvoked bool) (job *AsyncOttoJob, err error) {
|
|
if avm.IsWorking() {
|
|
return job, errors.New(fmt.Sprintf("VM %d couldn't start new job, because it is still running one"))
|
|
}
|
|
|
|
avm.SetWorking(true)
|
|
|
|
//create job
|
|
jobNum++ //jobs start from 1 not 0
|
|
ctx,cancel := context.WithCancel(ctx)
|
|
job = &AsyncOttoJob{
|
|
Id: jobNum,
|
|
ctx: ctx,
|
|
Cancel: cancel,
|
|
executingVM: avm,
|
|
Source: src,
|
|
finishedNotify: make(chan struct{}),
|
|
}
|
|
|
|
fmt.Printf("RunAsync called for VM %d\n", avm.Id)
|
|
|
|
go func(avm *AsyncOttoVM) {
|
|
select {
|
|
case <-job.ctx.Done():
|
|
if job.executingVM != nil {
|
|
fmt.Printf("Job %d received IRQ, sending to VM%d\n", job.Id, avm.Id)
|
|
job.executingVM.vm.Interrupt <- func() {
|
|
log.Printf("VM %d EXECUTED INTERRUPT FUNCTION\n", avm.Id)
|
|
panic(haltirq)
|
|
}
|
|
} else {
|
|
fmt.Printf("Job %d received IRQ, NOT sending to VM as not attached to any\n", job.Id)
|
|
}
|
|
}
|
|
}(avm)
|
|
|
|
go func(avm *AsyncOttoVM) {
|
|
defer func() { //runs after avm.vm.Run() returns (because script finished or was interrupted)
|
|
defer avm.SetWorking(false)
|
|
if caught := recover(); caught != nil {
|
|
fmt.Printf("VM %d CAUGHT INTERRUPT, ENDING JOB %d\n", avm.Id, job.Id)
|
|
if caught == haltirq {
|
|
job.ResultErr = errors.New(fmt.Sprintf("Execution of job %d on VM %d interrupted\n", job.Id, avm.Id))
|
|
|
|
avm.emitEvent(Event{
|
|
Job:job,
|
|
Vm:job.executingVM,
|
|
Type:EventType_JOB_CANCELLED,
|
|
Message:"Script execution cancelled",
|
|
})
|
|
|
|
// signal Job finished
|
|
job.SetFinished()
|
|
return
|
|
}
|
|
panic(caught) //re-raise panic, as it isn't `haltirq`
|
|
}
|
|
return
|
|
}()
|
|
|
|
fmt.Printf("START JOB %d SCRIPT ON VM %d\n", job.Id, avm.Id) //DEBUG
|
|
|
|
//short pre-run to set JobID and VMID (ignore errors)
|
|
avm.vm.Run(fmt.Sprintf("JID=%d;VMID=%d;\n", job.Id, avm.Id))
|
|
|
|
// try to wrap source into a anonymous, self-invoked function, to avoid polluting global scope (only if source is string)
|
|
if src, isString := job.Source.(string); isString {
|
|
if anonymousSelfInvoked {
|
|
src = fmt.Sprintf("(function() {\n%s\n})();",src)
|
|
job.ResultValue, job.ResultErr = avm.vm.Run(src) //store result
|
|
job.SetFinished() // signal job finished
|
|
}
|
|
} else {
|
|
job.ResultValue, job.ResultErr = avm.vm.Run(job.Source) //store result
|
|
job.SetFinished() // signal job finished
|
|
}
|
|
|
|
//Emit event + print DEBUG
|
|
evRes := Event{ Vm: avm, Job: job }
|
|
//evRes.ScriptSource,_ = job.Source.(string) //if string, attach source to event
|
|
if job.ResultErr == nil {
|
|
jRes,jErr := job.ResultJsonString()
|
|
if jErr == nil {
|
|
evRes.Type = EventType_JOB_SUCCEEDED
|
|
evRes.Message = fmt.Sprintf("JOB %d on VM %d SUCCEEDED WITH RESULT: %s", job.Id, avm.Id, jRes)
|
|
fmt.Println(evRes.Message)
|
|
} else {
|
|
evRes.Type = EventType_JOB_SUCCEEDED
|
|
evRes.Message = fmt.Sprintf("JOB %d on VM %d SUCCEEDED BUT RESULT COULDN'T BE MARSHALED TO JSON: %v", job.Id, avm.Id, jErr)
|
|
fmt.Println(evRes.Message)
|
|
}
|
|
} else {
|
|
evRes.Type = EventType_JOB_FAILED
|
|
evRes.Message = fmt.Sprintf("JOB %d on VM %d FAILED: %v", job.Id, avm.Id, job.ResultErr)
|
|
fmt.Println(evRes.Message)
|
|
}
|
|
avm.emitEvent(evRes)
|
|
|
|
}(avm)
|
|
|
|
return job,nil
|
|
}
|
|
|
|
|
|
func (avm *AsyncOttoVM) Run(ctx context.Context,src interface{}, anonymousSelfInvoked bool) (val otto.Value, res error) {
|
|
job,err := avm.RunAsync(ctx, src, anonymousSelfInvoked)
|
|
if err != nil { return val,err }
|
|
return job.WaitResult()
|
|
}
|