Files
test2/release/io/netrender/master.py
Martin Poirier 22274d3807 More automatic stuff.
Server can now be set to broadcast on local network (every 10s, approximately 20 bytes of data) where client and slave can pick up its existence. This is on by default.

Default ip address is now "[default]", which means for the master that it will listen to all interface and for the clients and slave that they will automatically work out the master's address from its broadcast.
2009-09-15 19:53:18 +00:00

636 lines
18 KiB
Python

import sys, os
import http, http.client, http.server, urllib, socket
import subprocess, shutil, time, hashlib
from netrender.utils import *
import netrender.model
JOB_WAITING = 0 # before all data has been entered
JOB_PAUSED = 1 # paused by user
JOB_QUEUED = 2 # ready to be dispatched
class MRenderFile:
def __init__(self, filepath, start, end):
self.filepath = filepath
self.start = start
self.end = end
self.found = False
def test(self):
self.found = os.path.exists(self.filepath)
return self.found
class MRenderSlave(netrender.model.RenderSlave):
def __init__(self, name, address, stats):
super().__init__()
self.id = hashlib.md5(bytes(repr(name) + repr(address), encoding='utf8')).hexdigest()
self.name = name
self.address = address
self.stats = stats
self.last_seen = time.time()
self.job = None
self.frame = None
netrender.model.RenderSlave._slave_map[self.id] = self
def seen(self):
self.last_seen = time.time()
# sorting key for jobs
def groupKey(job):
return (job.status, job.framesLeft() > 0, job.priority, job.credits)
class MRenderJob(netrender.model.RenderJob):
def __init__(self, job_id, name, files, chunks = 1, priority = 1, credits = 100.0, blacklist = []):
super().__init__()
self.id = job_id
self.name = name
self.files = files
self.frames = []
self.chunks = chunks
self.priority = priority
self.credits = credits
self.blacklist = blacklist
self.last_dispatched = time.time()
# special server properties
self.save_path = ""
self.files_map = {path: MRenderFile(path, start, end) for path, start, end in files}
self.status = JOB_WAITING
def save(self):
if self.save_path:
f = open(self.save_path + "job.txt", "w")
f.write(repr(self.serialize()))
f.close()
def testStart(self):
for f in self.files_map.values():
if not f.test():
return False
self.start()
return True
def start(self):
self.status = JOB_QUEUED
def update(self):
self.credits -= 5 # cost of one frame
self.credits += (time.time() - self.last_dispatched) / 60
self.last_dispatched = time.time()
def addFrame(self, frame_number):
frame = MRenderFrame(frame_number)
self.frames.append(frame)
return frame
def framesLeft(self):
total = 0
for j in self.frames:
if j.status == QUEUED:
total += 1
return total
def reset(self, all):
for f in self.frames:
f.reset(all)
def getFrames(self):
frames = []
for f in self.frames:
if f.status == QUEUED:
self.update()
frames.append(f)
if len(frames) >= self.chunks:
break
return frames
class MRenderFrame(netrender.model.RenderFrame):
def __init__(self, frame):
super().__init__()
self.number = frame
self.slave = None
self.time = 0
self.status = QUEUED
def reset(self, all):
if all or self.status == ERROR:
self.slave = None
self.time = 0
self.status = QUEUED
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
class RenderHandler(http.server.BaseHTTPRequestHandler):
def send_head(self, code = http.client.OK, headers = {}):
self.send_response(code)
self.send_header("Content-type", "application/octet-stream")
for key, value in headers.items():
self.send_header(key, value)
self.end_headers()
def do_HEAD(self):
print(self.path)
if self.path == "status":
job_id = self.headers.get('job-id', "")
job_frame = int(self.headers.get('job-frame', -1))
if job_id:
print("status:", job_id, "\n")
job = self.server.getJobByID(job_id)
if job:
if job_frame != -1:
frame = job[frame]
if not frame:
# no such frame
self.send_heat(http.client.NO_CONTENT)
return
else:
# no such job id
self.send_head(http.client.NO_CONTENT)
return
self.send_head()
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def do_GET(self):
print(self.path)
if self.path == "version":
self.send_head()
self.server.stats("", "New client connection")
self.wfile.write(VERSION)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "render":
job_id = self.headers['job-id']
job_frame = int(self.headers['job-frame'])
print("render:", job_id, job_frame)
job = self.server.getJobByID(job_id)
if job:
frame = job[job_frame]
if frame:
if frame.status in (QUEUED, DISPATCHED):
self.send_head(http.client.ACCEPTED)
elif frame.status == DONE:
self.server.stats("", "Sending result back to client")
f = open(job.save_path + "%04d" % job_frame + ".exr", 'rb')
self.send_head()
shutil.copyfileobj(f, self.wfile)
f.close()
elif frame.status == ERROR:
self.send_head(http.client.PARTIAL_CONTENT)
else:
# no such frame
self.send_head(http.client.NO_CONTENT)
else:
# no such job id
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "log":
job_id = self.headers['job-id']
job_frame = int(self.headers['job-frame'])
print("log:", job_id, job_frame)
job = self.server.getJobByID(job_id)
if job:
frame = job[job_frame]
if frame:
if frame.status in (QUEUED, DISPATCHED):
self.send_head(http.client.PROCESSING)
else:
self.server.stats("", "Sending log back to client")
f = open(job.save_path + "%04d" % job_frame + ".log", 'rb')
self.send_head()
shutil.copyfileobj(f, self.wfile)
f.close()
else:
# no such frame
self.send_head(http.client.NO_CONTENT)
else:
# no such job id
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "status":
job_id = self.headers.get('job-id', "")
job_frame = int(self.headers.get('job-frame', -1))
if job_id:
print("status:", job_id, "\n")
job = self.server.getJobByID(job_id)
if job:
if job_frame != -1:
frame = job[frame]
if frame:
message = frame.serialize()
else:
# no such frame
self.send_heat(http.client.NO_CONTENT)
return
else:
message = job.serialize()
else:
# no such job id
self.send_head(http.client.NO_CONTENT)
return
else: # status of all jobs
message = []
for job in self.server:
message.append(job.serialize())
self.send_head()
self.wfile.write(bytes(repr(message), encoding='utf8'))
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "job":
self.server.update()
slave_id = self.headers['slave-id']
print("slave-id", slave_id)
slave = self.server.updateSlave(slave_id)
if slave: # only if slave id is valid
job, frames = self.server.getNewJob(slave_id)
if job and frames:
for f in frames:
print("dispatch", f.number)
f.status = DISPATCHED
f.slave = slave
self.send_head(headers={"job-id": job.id})
message = job.serialize(frames)
self.wfile.write(bytes(repr(message), encoding='utf8'))
self.server.stats("", "Sending job frame to render node")
else:
# no job available, return error code
self.send_head(http.client.ACCEPTED)
else: # invalid slave id
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "file":
slave_id = self.headers['slave-id']
slave = self.server.updateSlave(slave_id)
if slave: # only if slave id is valid
job_id = self.headers['job-id']
job_file = self.headers['job-file']
print("job:", job_id, "\n")
print("file:", job_file, "\n")
job = self.server.getJobByID(job_id)
if job:
render_file = job.files_map.get(job_file, None)
if render_file:
self.server.stats("", "Sending file to render node")
f = open(render_file.path, 'rb')
shutil.copyfileobj(f, self.wfile)
f.close()
else:
# no such file
self.send_head(http.client.NO_CONTENT)
else:
# no such job id
self.send_head(http.client.NO_CONTENT)
else: # invalid slave id
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "slave":
message = []
for slave in self.server.slaves:
message.append(slave.serialize())
self.send_head()
self.wfile.write(bytes(repr(message), encoding='utf8'))
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def do_POST(self):
print(self.path)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
if self.path == "job":
print("posting job info")
self.server.stats("", "Receiving job")
length = int(self.headers['content-length'])
job_info = netrender.model.RenderJob.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
job_id = self.server.nextJobID()
print(job_info.files)
job = MRenderJob(job_id, job_info.name, job_info.files, chunks = job_info.chunks, priority = job_info.priority, blacklist = job_info.blacklist)
for frame in job_info.frames:
frame = job.addFrame(frame.number)
self.server.addJob(job)
headers={"job-id": job_id}
if job.testStart():
self.send_head(headers=headers)
else:
self.send_head(http.client.ACCEPTED, headers=headers)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "cancel":
job_id = self.headers.get('job-id', "")
if job_id:
print("cancel:", job_id, "\n")
self.server.removeJob(job_id)
else: # cancel all jobs
self.server.clear()
self.send_head()
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "reset":
job_id = self.headers.get('job-id', "")
job_frame = int(self.headers.get('job-frame', "-1"))
all = bool(self.headers.get('reset-all', "False"))
job = self.server.getJobByID(job_id)
if job:
if job_frame != -1:
job[job_frame].reset(all)
else:
job.reset(all)
self.send_head()
else: # job not found
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "slave":
length = int(self.headers['content-length'])
job_frame_string = self.headers['job-frame']
slave_info = netrender.model.RenderSlave.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats)
self.send_head(headers = {"slave-id": slave_id})
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def do_PUT(self):
print(self.path)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
if self.path == "file":
print("writing blend file")
self.server.stats("", "Receiving job")
length = int(self.headers['content-length'])
job_id = self.headers['job-id']
job_file = self.headers['job-file']
job = self.server.getJobByID(job_id)
if job:
render_file = job.files_map.get(job_file, None)
if render_file:
main_file = job.files[0]
main_path, main_name = os.path.split(main_file)
if job_file != main_file:
file_path = prefixPath(job.save_path, job_file, main_path)
else:
file_path = job.save_path + main_name
buf = self.rfile.read(length)
# add same temp file + renames as slave
f = open(file_path, "wb")
f.write(buf)
f.close()
del buf
render_file.path = file_path # set the new path
if job.testStart():
self.send_head(headers=headers)
else:
self.send_head(http.client.ACCEPTED, headers=headers)
else: # invalid file
self.send_head(http.client.NO_CONTENT)
else: # job not found
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "render":
print("writing result file")
self.server.stats("", "Receiving render result")
slave_id = self.headers['slave-id']
slave = self.server.updateSlave(slave_id)
if slave: # only if slave id is valid
job_id = self.headers['job-id']
job = self.server.getJobByID(job_id)
if job:
job_frame = int(self.headers['job-frame'])
job_result = int(self.headers['job-result'])
job_time = float(self.headers['job-time'])
frame = job[job_frame]
if job_result == DONE:
length = int(self.headers['content-length'])
buf = self.rfile.read(length)
f = open(job.save_path + "%04d" % job_frame + ".exr", 'wb')
f.write(buf)
f.close()
del buf
elif job_result == ERROR:
# blacklist slave on this job on error
job.blacklist.append(slave.id)
frame.status = job_result
frame.time = job_time
self.server.updateSlave(self.headers['slave-id'])
self.send_head()
else: # job not found
self.send_head(http.client.NO_CONTENT)
else: # invalid slave id
self.send_head(http.client.NO_CONTENT)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "log":
print("writing log file")
self.server.stats("", "Receiving log file")
job_id = self.headers['job-id']
job = self.server.getJobByID(job_id)
if job:
length = int(self.headers['content-length'])
job_frame = int(self.headers['job-frame'])
buf = self.rfile.read(length)
f = open(job.save_path + "%04d" % job_frame + ".log", 'ab')
f.write(buf)
f.close()
del buf
self.server.updateSlave(self.headers['slave-id'])
self.send_head()
else: # job not found
self.send_head(http.client.NO_CONTENT)
class RenderMasterServer(http.server.HTTPServer):
def __init__(self, address, handler_class, path):
super().__init__(address, handler_class)
self.jobs = []
self.jobs_map = {}
self.slaves = []
self.slaves_map = {}
self.job_id = 0
self.path = path + "master_" + str(os.getpid()) + os.sep
if not os.path.exists(self.path):
os.mkdir(self.path)
def nextJobID(self):
self.job_id += 1
return str(self.job_id)
def addSlave(self, name, address, stats):
slave = MRenderSlave(name, address, stats)
self.slaves.append(slave)
self.slaves_map[slave.id] = slave
return slave.id
def getSlave(self, slave_id):
return self.slaves_map.get(slave_id, None)
def updateSlave(self, slave_id):
slave = self.getSlave(slave_id)
if slave:
slave.seen()
return slave
def clear(self):
self.jobs_map = {}
self.jobs = []
def update(self):
self.jobs.sort(key = groupKey)
def removeJob(self, id):
job = self.jobs_map.pop(id)
if job:
self.jobs.remove(job)
def addJob(self, job):
self.jobs.append(job)
self.jobs_map[job.id] = job
# create job directory
job.save_path = self.path + "job_" + job.id + os.sep
if not os.path.exists(job.save_path):
os.mkdir(job.save_path)
job.save()
def getJobByID(self, id):
return self.jobs_map.get(id, None)
def __iter__(self):
for job in self.jobs:
yield job
def getNewJob(self, slave_id):
if self.jobs:
for job in reversed(self.jobs):
if job.status == JOB_QUEUED and job.framesLeft() > 0 and slave_id not in job.blacklist:
return job, job.getFrames()
return None, None
def runMaster(address, broadcast, path, update_stats, test_break):
httpd = RenderMasterServer(address, RenderHandler, path)
httpd.timeout = 1
httpd.stats = update_stats
if broadcast:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
start_time = time.time()
while not test_break():
httpd.handle_request()
if broadcast:
if time.time() - start_time >= 10: # need constant here
print("broadcasting address")
s.sendto(bytes("%s:%i" % address, encoding='utf8'), 0, ('<broadcast>',address[1]))
start_time = time.time()