python demo tree eTag |
|
|
# -*- coding: utf-8 -*-
'''
Created on 2013-3-29
@author: wave
'''
import md5
import time
class IDGenerator(object):
maxID = 0
@classmethod
def genID(cls, moduleID="0000", agentID="0000", treeId="0000"):
cls.maxID += 1
return int(moduleID + agentID + treeId + time.strftime("%Y%m%d%H%M%S", time.localtime()) + ("%04d" % cls.maxID))
class ETagGenerator(object):
maxID = 0
@classmethod
def genETag(cls, moduleID="0000", agentID="0000", treeId="0000"):
cls.maxID += 1
return int(moduleID + agentID + treeId + time.strftime("%Y%m%d%H%M%S", time.localtime()) + ("%04d" % cls.maxID))
class Node(object):
def __init__(self, nodeID, parentNodeID, data, eTag): #
self.parentNodeID = parentNodeID
self.nodeID = nodeID
self.data = data
self.eTag = eTag
def __str__(self):
return "{0}.{1}".format(self.data, self.eTag)
class Leaf(object):
def __init__(self, leafID, parentNodeID, data, dataSize, eTag): #
self.parentNodeID = parentNodeID
self.leafID = leafID
self.data = data
self.dataSize = dataSize
self.eTag = eTag
def __str__(self):
return "{0}.{1}".format(self.data, self.eTag)
class Tree(object):
def __init__(self):
self.rootNode = None
self.currentNodes = {}
self.currentLeafs = {}
self.leafData = {}
self.leafMap = {}
def _genNewNode(self, newNodeData, parentNodeID):
nodeID = IDGenerator.genID()
newNode = Node(nodeID, parentNodeID, newNodeData, ETagGenerator.genETag())
self.currentNodes[nodeID] = newNode
return newNode
def _genNewLeaf(self, leafInfo, parentNodeID):
#
if "fileDigest" in leafInfo.keys():
md5key = leafInfo["fileDigest"].upper()
if not md5key in self.leafData.keys():
raise Exception("file not exists in tree")
leafID = IDGenerator.genID()
newLeaf = Leaf(leafID, parentNodeID, leafInfo["fileName"], len(self.leafData[md5key]), ETagGenerator.genETag())
self.currentLeafs[leafID] = newLeaf
self.leafMap[leafID] = md5key
else:
fileData = leafInfo["fileData"]
md5key = md5.new(fileData).hexdigest().upper()
if not md5key in self.leafData.keys():
self.leafData[md5key] = fileData
leafID = IDGenerator.genID()
newLeaf = Leaf(leafID, parentNodeID, leafInfo["fileName"], len(fileData), ETagGenerator.genETag())
self.currentLeafs[leafID] = newLeaf
self.leafMap[leafID] = md5key
return newLeaf
def _getNode(self, nodeID):
if nodeID in self.currentNodes.keys():
return self.currentNodes[nodeID]
raise ValueError("no node found, {0}".format(nodeID))
def _getLeaf(self, leafID):
if leafID in self.currentLeafs:
return self.currentLeafs[leafID]
raise ValueError("no leaf found, {0}".format(leafID))
def _getSubNodes(self, nodeID):
subNodes = []
for _, node in self.currentNodes.iteritems():
if (node.parentNodeID == nodeID):
subNodes.append(node)
return subNodes
def _getSubLeafs(self, nodeID):
subLeafs = []
for _, leaf in self.currentLeafs.iteritems():
if (leaf.parentNodeID == nodeID):
subLeafs.append(leaf)
return subLeafs
def addNode(self, newNodeData, targetNodeID=None):
if self.rootNode is None: # 未创建根目录
if targetNodeID is None:
newNode = self._genNewNode(newNodeData, None)
self.rootNode = newNode
return newNode.nodeID
else: # 根目录不存在的情况下,不允许增加
return
else:
if targetNodeID is None: # 默认是根目录
newNode = self._genNewNode(newNodeData, self.rootNode.nodeID)
else:
parentNode = self._getNode(targetNodeID)
newNode = self._genNewNode(newNodeData, parentNode.nodeID)
return newNode.nodeID
def addLeaf(self, leafData, targetNodeID):
parentNode = self._getNode(targetNodeID)
newLeaf = self._genNewLeaf(leafData, parentNode.nodeID)
node = self._getNode(targetNodeID)
node.eTag = ETagGenerator.genETag()
return newLeaf.leafID
def updateNode(self, nodeID, newNodeData):
node = self._getNode(nodeID)
node.data = newNodeData
node.eTag = ETagGenerator.genETag()
def updateLeaf(self, leafID, newLeafInfo):
leaf = self._getLeaf(leafID)
leaf.eTag = ETagGenerator.genETag()
if "fileName" in newLeafInfo:
leaf.data = newLeafInfo["fileName"]
if "fileData" in newLeafInfo:
fileData = newLeafInfo["fileData"]
md5key = md5.new(fileData).hexdigest().upper()
if not md5key in self.leafData.keys():
self.leafData[md5key] = fileData
self.leafMap[leafID] = md5key
node = self._getNode(leaf.parentNodeID)
node.eTag = ETagGenerator.genETag()
def gcLeafData(self):
for md5Key, fileData in self.leafData.iteritems():
if not md5Key in self.leafMap.values():
print "%s is no more refered by any leaf, can be delete. fileData is %s" % (md5Key, fileData)
def deepCopyNodes(self, sourceNodeIDs, targetNodeID):
def getSourceTree(node):
class SourceNode(object):
def __init__(self, nodeID, nodeName):
self.nodeID = nodeID
self.nodeName = nodeName
self.totalSize = 0
self.subNodes = []
def __str__(self):
return "{0}.{1}".format(self.nodeName, self.nodeID)
def getSourceTree(node, leafs):
sourceNode = SourceNode(node.nodeID, node.data)
subLeafs = self._getSubLeafs(node.nodeID)
if subLeafs:
subLeafs.sort(cmp=lambda x, y : cmp(x.dataSize, y.dataSize))
leafs[node.nodeID] = [leaf.leafID for leaf in subLeafs]
sourceNode.totalSize = sum([leaf.dataSize for leaf in subLeafs])
subNodes = self._getSubNodes(sourceNode.nodeID)
for subNode in subNodes:
sourceNode.subNodes.append(getSourceTree(subNode, leafs))
return sourceNode
leafs = {}
rootSourceNode = getSourceTree(node, leafs)
return rootSourceNode, leafs
def copyLeafs(leafs, targetNodeID):
for leafID in leafs:
self._genNewLeaf({"fileName":self._getLeaf(leafID).data,
"fileDigest":self.leafMap[leafID]},
targetNodeID)
def deepCopy(sourceNode, targetNode, leafs):
newNode = self._genNewNode(sourceNode.nodeName, targetNode.nodeID)
if sourceNode.nodeID in leafs.keys():
copyLeafs(leafs[sourceNode.nodeID], newNode.nodeID)
subNodes = sourceNode.subNodes
subNodes.sort(cmp=lambda x, y : cmp(x.totalSize, y.totalSize))
for subNode in subNodes:
deepCopy(subNode, newNode, leafs)
newNode.eTag = ETagGenerator.genETag()
targetNode = self._getNode(targetNodeID)
for sourceNodeID in sourceNodeIDs:
sourceNode = self._getNode(sourceNodeID)
rootSourceNode, leafs = getSourceTree(sourceNode)
deepCopy(rootSourceNode, targetNode, leafs)
def levelCopyNodes(self, sourceNodeIDs, targetNodeID): # 不考虑空间
def getSourceTree(sourceNode):
class SourceNode(object):
def __init__(self, nodeID, parentNodeID, nodeName):
self.nodeID = nodeID
self.parentNodeID = parentNodeID
self.nodeName = nodeName
def __str__(self):
return "{0}.{1}".format(self.nodeName, self.nodeID)
def getSourceNodeByLevel(sourceNodeList, leafs):
nodesCurrentLevel = sourceNodeList[-1]
nodesNextLevel = []
for souceNode in nodesCurrentLevel:
subLeafs = self._getSubLeafs(souceNode.nodeID)
for subLeaf in subLeafs:
leafs[subLeaf.leafID] = souceNode.nodeID
subNodes = self._getSubNodes(souceNode.nodeID)
for subNode in subNodes:
nodesNextLevel.append(SourceNode(subNode.nodeID, subNode.parentNodeID, subNode.data))
if nodesNextLevel:
sourceNodeList.append(nodesNextLevel)
getSourceNodeByLevel(sourceNodeList, leafs)
leafs = {}
sourceNodeList = []
sourceNodeList.append([SourceNode(sourceNode.nodeID, None, sourceNode.data)])
getSourceNodeByLevel(sourceNodeList, leafs)
return sourceNodeList, leafs
targetNode = self._getNode(targetNodeID)
for sourceNodeID in sourceNodeIDs:
sourceNode = self._getNode(sourceNodeID)
sourceNodeList, leafs = getSourceTree(sourceNode)
nodeMap = {}
for sourceNodes in sourceNodeList:
for sourceNode in sourceNodes:
if sourceNodeList.index(sourceNodes) == 0:
newNode = self._genNewNode(sourceNode.nodeName, targetNode.nodeID)
nodeMap[sourceNode.nodeID] = newNode
else:
destNode = nodeMap[sourceNode.parentNodeID]
newNode = self._genNewNode(sourceNode.nodeName, destNode.nodeID)
nodeMap[sourceNode.nodeID] = newNode
newLeafs = []
for sourceLeafID, sourceParentNodeID in leafs.iteritems():
newLeaf = self._genNewLeaf({"fileName":self._getLeaf(sourceLeafID).data,
"fileDigest":self.leafMap[sourceLeafID]},
nodeMap[sourceParentNodeID].nodeID)
newLeafs.append(newLeaf)
for node in nodeMap.values():
node.eTag = ETagGenerator.genETag()
def printTree(self):
def printNode(nodeID, depth=0):
node = self._getNode(nodeID)
print "|" + "-" * depth + str(node)
subLeafs = self._getSubLeafs(nodeID)
for subLeaf in subLeafs:
print "|" + "-" * (depth + 1) + str(subLeaf) + " --> " + self.leafMap[subLeaf.leafID]
subNodes = self._getSubNodes(nodeID)
for subNode in subNodes:
printNode(subNode.nodeID, depth + 1)
printNode(self.rootNode.nodeID, 1)
def syncTree(self, syncToken=None, pageSize=2): #
if syncToken is None:
syncToken = 0
def syncNode(node, updatedNodes):
if node.eTag > syncToken:
updatedNodes.append(node)
subNodes = self._getSubNodes(node.nodeID)
for subNode in subNodes:
syncNode(subNode, updatedNodes)
def syncLeaf(node, updatedLeafs):
for leaf in self._getSubLeafs(node.nodeID):
if leaf.eTag > syncToken:
updatedLeafs.append(leaf)
updatedNodes = []
syncNode(self.rootNode, updatedNodes)
updatedNodes.sort(cmp=lambda x, y : cmp(y.eTag, x.eTag))
updatedNodes = updatedNodes[-pageSize:]
updatedLeafs = []
for node in updatedNodes:
syncLeaf(node, updatedLeafs)
updatedLeafs.sort(cmp=lambda x, y : cmp(y.eTag, x.eTag))
updatedLeafs = updatedLeafs[-pageSize:]
#
# 关于分页的可靠性:
# 由于nodes的eTag最大的总是会比leafs中的最大eTag还要大
# 所以如果leafs总是会由于eTag先被查出来??
for node in updatedNodes:
print node
for leaf in updatedLeafs:
print leaf
return updatedNodes, updatedLeafs
if __name__ == '__main__': # 不考虑version, 只考虑更新ETag
tree = Tree()
root = tree.addNode("root")
folder1 = tree.addNode("folder1", root)
folder2 = tree.addNode("folder2", root)
folder3 = tree.addNode("folder3", root)
folder11 = tree.addNode("folder11", folder1)
folder12 = tree.addNode("folder12", folder1)
folder21 = tree.addNode("folder21", folder2)
folder22 = tree.addNode("folder22", folder2)
eTag = 0
while True:
nodes, leafs = tree.syncTree(eTag)
if not nodes:
break;
eTag = nodes[0].eTag
if leafs and leafs[0].eTag:
eTag = leafs[0].eTag
print
# 更新节点
tree.updateNode(folder22, "folder22 new Node data")
tree.updateNode(folder12, "folder12 new Node data")
while True:
nodes, leafs = tree.syncTree(eTag)
if not nodes:
break;
eTag = nodes[0].eTag
if leafs and leafs[0].eTag:
eTag = leafs[0].eTag
print
#
folder13 = tree.addNode("folder13", folder1)
leaf1 = tree.addLeaf({"fileName":"file1", "fileData":"1"*32}, folder1)
leaf2 = tree.addLeaf({"fileName":"file2", "fileData":"2"*32}, folder1)
leaf3 = tree.addLeaf({"fileName":"file3", "fileData":"1"*32}, folder1)
leaf4 = tree.addLeaf({"fileName":"file4", "fileDigest":md5.new("2"*32).hexdigest().upper()}, folder1)
# tree.updateLeaf(leaf1, {"fileData":"2"*32})
tree.updateLeaf(leaf3, {"fileData":"2"*32})
tree.gcLeafData()
while True:
nodes, leafs = tree.syncTree(eTag) # TODO 如果nodes和leafs长度都不满,说明结束了,可以少查一次
if not nodes:
break;
eTag = nodes[0].eTag
if leafs and leafs[0].eTag: # 总是以文件中的最大的eTag为目标eTag
eTag = leafs[0].eTag
print
# tree.levelCopyNodes([root], folder11)
# while True:
# nodes, leafs = tree.syncTree(eTag)
# if not nodes:
# break;
# eTag = nodes[0].eTag
# if leafs and leafs[0].eTag:
# eTag = leafs[0].eTag
# print
# tree.deepCopyNodes([root], folder12)
# while True:
# nodes, leafs = tree.syncTree(eTag)
# if not nodes:
# break;
# eTag = nodes[0].eTag
# if leafs and leafs[0].eTag:
# eTag = leafs[0].eTag
# print
tree.printTree()
|
lxml xml to dict |
|
|
from lxml import etree
def recursive_dict(element):
# res = []
# for item in element: # element is iterable
# res.append(recursive_dict(item))
res = map(recursive_dict, element)
theDict = {}
for k, v in res:
if k in theDict.keys():
if type(theDict[k]) is list:
theDict[k].append(v)
else:
theDict[k] = [theDict[k], v]
else:
theDict[k] = v
return element.tag, theDict or element.text
def xml2dict(xmlStr):
return recursive_dict(etree.fromstring(xmlStr))[1]
if __name__ == "__main__":
xmlStr = """<root>
<a>aValue</a>
<b>
<item>item1</item>
<item>item2</item>
<item>item2</item>
</b>
<c>
<item1>
<iitem>xxx</iitem>
</item1>
<item2>item2</item2>
</c>
</root>"""
print xml2dict(xmlStr)
|
python chroot |
|
|
# -*- coding: utf-8 -*-
'''
Created on 2013-3-25
@author: wave
'''
import os
import subprocess
import shutil
def which(eFile=None):
if eFile:
if os.access(eFile, os.X_OK):
return eFile
default_path = "/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin"
for path in os.environ.get('PATH', default_path).split(os.pathsep):
full_path = os.path.join(path, eFile)
if os.access(full_path, os.X_OK):
return full_path
return None
def _run(cmd, cwd=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=(), rstrip=True, retcode=False
):
if not cwd:
cwd = os.path.expanduser('~')
if not os.access(cwd, os.R_OK):
cwd = '/'
run_env = os.environ
run_env.update(env)
kwargs = {'cwd': cwd,
'shell': True,
'env': run_env,
'stdout': stdout,
'stderr': stderr}
kwargs['executable'] = os.environ.get('SHELL', '/bin/sh')
kwargs['close_fds'] = True
if retcode:
kwargs['stdout'] = None
kwargs['stderr'] = None
proc = subprocess.Popen(cmd, **kwargs)
out, err = proc.communicate()
if rstrip:
if out is not None:
out = out.rstrip()
if err is not None:
err = err.rstrip()
ret = {}
ret['stdout'] = out
ret['stderr'] = err
ret['pid'] = proc.pid
ret['retcode'] = proc.returncode
return ret
def ldd(eFile):
output = _run(('ldd %s' % eFile), stderr=subprocess.STDOUT)['stdout']
result = []
for line in output.split("\n"):
line = line.strip()
sepIndex = line.find("=>")
if sepIndex != -1:
left = line[:sepIndex].strip()
right = line[sepIndex + 2:].strip()
result.append((left[left.rfind("lib"):], right[:right.find(" ")]))
else:
result.append(line[:line.find(" ")])
return result
def copy2Target(targetRootPath, refFiles):
def cp(sourceFile, targetPath):
print "copy %s to %s" % (sourceFile, targetPath)
if not os.path.exists(sourceFile):
print "%s is not exist" % sourceFile
return
if not os.path.exists(targetPath):
os.makedirs(targetPath)
shutil.copy2(sourceFile, targetPath)
def copyFile(targetRootPath, refFile):
if refFile.startswith("/usr/lib64/"):
cp(refFile, targetRootPath + "/lib64")
elif refFile.startswith("/usr/lib/"):
cp(refFile, targetRootPath + "/lib")
elif refFile.startswith("/lib64/"):
cp(refFile, targetRootPath + "/lib64")
elif refFile.startswith("/lib/"):
cp(refFile, targetRootPath + "/lib")
elif refFile.startswith("lib"):
if os.path.exists("/lib/" + refFile):
cp ("/lib/" + refFile, targetRootPath + "/lib")
elif os.path.exists("/usr/lib/" + refFile):
cp ("/usr/lib/" + refFile, targetRootPath + "/usr/lib")
elif refFile.startswith("/usr/bin/"):
cp(refFile, targetRootPath + "/usr/bin")
elif refFile.startswith("/bin/"):
cp(refFile, targetRootPath + "/bin")
else:
cp(refFile, targetRootPath + "/home")
for refFile in refFiles:
if type(refFile) is tuple or type(refFile) is list:
copyFile(targetRootPath, refFile[0])
copyFile(targetRootPath, refFile[1])
else:
copyFile(targetRootPath, refFile)
def makeRoot(targetRootPath="."):
for command in ["csh"]:
fullPath = which(command)
refFiles = ldd(fullPath)
print refFiles
copy2Target(targetRootPath, refFiles)
makeRoot()
|
python tail call optimize |
|
|
# -*- coding: utf-8 -*-
'''
Created on 2013-3-26
@author: wave
'''
import sys
class TailRecurseException:
def __init__(self, args, kwargs):
self.args = args
self.kwargs = kwargs
def tail_call_optimized(g):
def func(*args, **kwargs):
f = sys._getframe()
if f.f_back and f.f_back.f_back and f.f_back.f_back.f_code == f.f_code:
raise TailRecurseException(args, kwargs)
else:
while 1:
try:
return g(*args, **kwargs)
except TailRecurseException, e:
args = e.args
kwargs = e.kwargs
func.__doc__ = g.__doc__
return func
@tail_call_optimized
def factorial(n, acc=1):
if n == 0:
return acc
return factorial(n - 1, n * acc)
print factorial(10000)
@tail_call_optimized
def fib(i, currentV=0, nextV=1):
if i == 0:
return currentV
else:
return fib(i - 1, nextV, currentV + nextV)
print fib(10000)
if __name__ == '__main__':
pass
|
AOP in python |
|
|
from contextlib import contextmanager
@contextmanager
def tag(name):
print "<%s>" % name
yield
print "" % name
with tag("h1"):
print "foo"
class test:
def __enter__(self):
print("enter")
return 1
def __exit__(self, *args):
print("exit")
def function():
print 'function()'
return 111
with test() as t:
function()
print 't is', t
|
demoTree |
|
|
# -*- coding: utf-8 -*-
'''
Created on 2013-3-21
@author: wave
'''
import random
import time
class IDGenerator(object):
maxID = 0
def __init__(self):
pass
def genID(cls):
cls.maxID += 1
return cls.maxID
genID = classmethod(genID)
class Node(object):
def __init__(self, nodeID, version, data):
self.parentNode = None
self.subNodes = {}
self.leafs = []
self.nodeID = nodeID
self.version = version
self.data = data
def __str__(self):
return "{0}.v{1}".format(self.data, self.version)
class Leaf(object):
def __init__(self, data):
pass
class Tree(object):
def __init__(self, treeName="trunck"):
self.treeName = treeName
self.rootNode = None
self.version = 0
self.snapshots = []
self.currentNodes = {}
self.tags = []
def _getVersionNode(self, nodeID, version=None):
if nodeID in self.currentNodes.keys():
if version:
for versionNode in self.currentNodes[nodeID]:
if versionNode.version == version:
return versionNode
else:
return self.currentNodes[nodeID][0]
raise ValueError("no node found, {0}.v{1}".format(nodeID, version))
def _genNewNode(self, newNodeData, nodeID=None):
if nodeID is None:
nodeID = IDGenerator.genID()
newNode = Node(nodeID, self.version, newNodeData)
if newNode.nodeID in self.currentNodes.keys():
self.currentNodes[newNode.nodeID].insert(0, newNode)
else:
self.currentNodes[newNode.nodeID] = [newNode]
return newNode
def addNode(self, newNodeData, targetNodeID=None, printTime=False):
if self.rootNode is None: # 未创建根目录
if targetNodeID is None:
newNode = self._genNewNode(newNodeData)
self.rootNode = newNode
return newNode.nodeID
else: # 根目录不存在的情况下,不允许增加
return
else:
if not printTime:
if targetNodeID is None: # 默认是根目录
parentNode = self.rootNode
else:
parentNode = self._getVersionNode(targetNodeID)
else:
st11 = time.time()
if targetNodeID is None: # 默认是根目录
parentNode = self.rootNode
else:
parentNode = self._getVersionNode(targetNodeID)
st12 = time.time()
print " time cost {0}".format(st12 - st11)
if not printTime:
newNode = self._genNewNode(newNodeData)
else:
st21 = time.time()
newNode = self._genNewNode(newNodeData)
st22 = time.time()
print " time cost {0}".format(st22 - st21)
# 向上查找所有修改version的节点
nodesUpdated = {}
self._updateParents(parentNode, nodesUpdated)
#
if parentNode in nodesUpdated.keys():
newNode.parentNode = nodesUpdated[parentNode]
else: # 不在nodesUpdated中的话,就在self.nodes中
newNode.parentNode = parentNode
newNode.parentNode.subNodes[newNode.nodeID] = newNode
return newNode.nodeID
def _updateParents(self, node, nodesUpdated):
while node and (node.version != self.version):
updatedNode = self._genNewNode(node.data, node.nodeID)
nodesUpdated[node] = updatedNode
node = node.parentNode
# 调整指针
for node, updatedNode in nodesUpdated.iteritems():
if node.parentNode in nodesUpdated.keys():
updatedNode.parentNode = nodesUpdated[node.parentNode]
else:
updatedNode.parentNode = node.parentNode
for node, updatedNode in nodesUpdated.iteritems():
for _, subNode in node.subNodes.iteritems():
if subNode in nodesUpdated.keys():
updatedNode.subNodes[nodesUpdated[subNode].nodeID] = nodesUpdated[subNode]
subNode.parnetNode = updatedNode
else:
updatedNode.subNodes[subNode.nodeID] = subNode
subNode.parnetNode = updatedNode # 将为改变的节点的parentNode修改为新的节点
if self.rootNode in nodesUpdated.keys():
self.rootNode = nodesUpdated[self.rootNode]
def update(self, targetNodeID, newNodeData):
if self.rootNode is None: # 未创建根目录
return
else:
targetNode = self._getVersionNode(targetNodeID)
if targetNode.version == self.version: # 节点当前就是更新状态
targetNode.data = newNodeData
else:
newNode = self._genNewNode(newNodeData, targetNode.nodeID)
# 向上查找所有修改version的节点
nodesUpdated = {}
self._updateParents(targetNode.parentNode, nodesUpdated)
#
if targetNode.parentNode in nodesUpdated.keys():
newNode.parentNode = nodesUpdated[targetNode.parentNode]
else: # 不在nodesUpdated中的话,就在self.nodes中
newNode.parentNode = targetNode.parentNode
newNode.parentNode.subNodes[newNode.nodeID] = newNode
# 由于下面的节点的parentNode改变了,如何处理?
for _, subNode in targetNode.subNodes.iteritems():
newNode.subNodes[subNode.nodeID] = subNode
subNode.parentNode = newNode
return newNode.nodeID
def deleteNode(self, targetNodeID):
if self.rootNode is None: # 未创建根目录
return
else:
targetNode = self._getVersionNode(targetNodeID)
# 创建一个没有自身节点的
# 向上查找所有修改version的节点
nodesUpdated = {}
self._updateParents(targetNode.parentNode, nodesUpdated)
# 新的父节点中去掉到自身的subNodes记录
if targetNode.parentNode in nodesUpdated.keys():
del nodesUpdated[targetNode.parentNode].subNodes[targetNode.nodeID]
else: # 不在nodesUpdated中的话,就在self.nodes中
del targetNode.parentNode.subNodes[targetNode.nodeID]
def _fixParentNode(self): # 由于所有节点的父目录都是指向最新节点的,需要修复
pass
def snapshot(self):
nodes = []
for _, versionNodes in self.currentNodes.iteritems():
if versionNodes[0].version == self.version:
nodes.append(versionNodes[0])
if len(nodes) == 0:
return
self.snapshots.insert(0, self.rootNode)
self.version += 1
def resvert(self, version=None):
self.tag()
if version:
toDels = []
for snap in self.snapshots:
if snap.version == version:
self.rootNode = snap
self.version = snap.version
elif snap.version > version:
toDels.append(snap)
for toDel in toDels:
self.snapshots.remove(toDel)
for _, versionNodes in self.currentNodes.iteritems():
toDels = []
for versionNode in versionNodes:
if versionNode.version > version:
toDels.append(versionNode)
for toDel in toDels:
versionNodes.remove(toDel)
else:
# version == self.version的节点删除
for _, versionNodes in self.currentNodes.iteritems():
if versionNodes[0].version == self.version: # 只会在0位置
versionNodes.remove(versionNode)
def tag(self): # 或许和svn的tag意思不一样
class Tag(object):
def __init__(self, tagName, rootNode, snapshots, currentNodes):
self.tagName = tagName
self.rootNode = rootNode
self.snapshots = snapshots
self.currentNodes = currentNodes
tagCurrentNodes = {} # tag是对currentNodes和snapshots备份
for nodeID, versionNodes in self.currentNodes.iteritems():
tagCurrentNodes[nodeID] = versionNodes
tagSnapshots = []
for snap in self.snapshots:
tagSnapshots.append(snap)
self.tags.append(Tag("tag{0}".format(len(self.tags)), self.rootNode, tagSnapshots, tagCurrentNodes))
def branch(self):
branch = Tree("branch")
self._copyTree(branch, self)
return branch
def _copyTree(self, targetTree, originalTree):
newRootNode = None
nodeMap = {}
for nodeID, versionNodes in originalTree.currentNodes.iteritems():
nodes = []
for versionNode in versionNodes:
branchNode = Node(versionNode.nodeID, versionNode.version, versionNode.data)
nodes.append(branchNode)
nodeMap[versionNode] = branchNode
if versionNode is originalTree.rootNode:
newRootNode = branchNode
targetTree.currentNodes[nodeID] = nodes
for _, versionNodes in originalTree.currentNodes.iteritems():
for versionNode in versionNodes:
for nodeID, sub in versionNode.subNodes.iteritems():
nodeMap[versionNode].subNodes[nodeID] = nodeMap[sub]
for snap in originalTree.snapshots:
targetTree.snapshots.append(nodeMap[snap])
if newRootNode is None:
raise ValueError("rootNode not found")
targetTree.rootNode = newRootNode
targetTree.version = originalTree.version
def searchByData(self, condition):
results = []
for _, versionNodes in self.currentNodes.iteritems():
for versionNode in versionNodes:
if versionNode.data.upper().find(condition.upper()) != -1:
results.append(versionNode.data)
return results
def printTree(self, version=None):
def printNode(node, depth=0):
print "|" + "-" * depth + str(node)
for _, subNode in node.subNodes.iteritems():
printNode(subNode, depth + 1)
print self.treeName,
if version is None or self.version == version:
print "version:", self.version
printNode(self.rootNode, 1)
for snap in self.snapshots:
if version is None or snap.version == version:
print "snap version:", snap.version
printNode(snap, 1)
print
# version Tree
# copy on write
if __name__ == '__main__':
tree = Tree()
root = tree.addNode("root")
folder1 = tree.addNode("folder1", root)
folder2 = tree.addNode("folder2", root)
folder11 = tree.addNode("folder11", folder1)
folder12 = tree.addNode("folder12", folder1)
folder21 = tree.addNode("folder21", folder2)
folder22 = tree.addNode("folder22", folder2)
tree.printTree()
tree.snapshot()
tree.snapshot()
folder221 = tree.addNode("folder221", folder22)
folder222 = tree.addNode("folder222", folder22)
tree.printTree() # v1
tree.snapshot()
tree.snapshot()
tree.update(folder22, "folder22_new")
tree.update(folder222, "folder222_new")
tree.printTree() # v2
branch = tree.branch()
tree.snapshot()
tree.snapshot()
tree.deleteNode(folder22) # copy on write体现在该节点的父节点会备份
tree.printTree() # v3
tree.snapshot()
tree.snapshot()
tree.resvert(1) # revert是危险操作,会导致节点删除,为了保证数据的永不删除,revert之前一定要先tag
tree.printTree()
branch.printTree()
st = time.time()
random.seed()
nodeIDs = []
newNodeID = tree.addNode("node_", folder1)
nodeIDs.append(newNodeID)
for i in range(100000):
if i % 1000 == 0:
st1 = time.time()
targetNode = nodeIDs[random.randint(0, len(nodeIDs) - 1)]
newNodeID = tree.addNode("node".format(i), targetNode, printTime=True)
nodeIDs.append(newNodeID)
st2 = time.time()
print "time cost {0}".format(st2 - st1)
else:
targetNode = nodeIDs[random.randint(0, len(nodeIDs) - 1)]
newNodeID = tree.addNode("node".format(i), targetNode)
nodeIDs.append(newNodeID)
st1 = time.time()
nodeID = targetNode = nodeIDs[random.randint(0, len(nodeIDs) - 1)]
ss = nodeID in tree.currentNodes.keys()
st2 = time.time()
print "time cost {0}".format(st2 - st1)
print tree.searchByData("222")
print "No Endless Cycle"
print "time cost {0}".format(time.time() - st) # 10000 到 100000 的差异很大
|
python list_files |
|
|
def list_files(directory):
ret = set()
ret.add(directory)
for root, dirs, files in os.walk(directory):
for name in files:
ret.add(os.path.join(root, name))
for name in dirs:
ret.add(os.path.join(root, name))
return list(ret)
|
python __metaclass__ |
|
|
import os
class classFactory(type):
def __new__(cls, name, base, dictory):
return type.__new__(cls, name, base, dictory)
def __init__(cls, name, base, dictory):
super(classFactory, cls).__init__(name, base, dictory)
def __getattr__(cls, attribute):
print attribute
def method(cls, *args, **kwargs):
print args, kwargs
Car = classFactory("Car", (object,), {"run": lambda self:"Car running"})
class Bus(Car):
__metaclass__ = classFactory
def __init__(self, *args):
pass
def run(self):
return "Bus running"
if __name__ == "__main__":
bus = Bus()
print bus.run()
print bus.ddd
|
python meta_path |
|
|
class xmlImport(object):
def find_module(self, fullname, path=None):
return self
def load_module(self, fullname):
if fullname in sys.modules:
return sys.modules[fullname]
mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod.__load__ = self
mod.__file___ = "<%s>" % fullname
if "." not in fullname:
mod.__path__ = []
mod.__package__ = fullname
else:
mod.__package__ = fullname.rpartition(".")[0]
exec "value = 1" in mod.__dict__
return mod
import imp
import sys
sys.meta_path.append(xmlImport())
import os12
print os12.value
|
python load module |
|
|
import imp
import sys
import os
#old_imp = __buildins__.__import__
#def new_imp(module_name, globals=None, locals=None, fromlist=[]):
# print module_name
# return old_imp(module_name, globals, locals, fromlist)
#__buildins__.__import__ = new_imp
def _compile():
strCode = "for i in range(0,10): print i"
code = compile(strCode, '', 'exec')
exec code
def _generate_module(modName):
if modName in sys.modules:
return sys.modules[modName]
code = "'''ext module loaded {0} parent module'''".format(modName)
mod = imp.new_module(modName)
exec code in mod.__dict__
sys.modules[modName] = mod
return mod
def _find_ext_module(extModName, module_dirs):
try:
fn_, path, desc = imp.find_module(extModName, module_dirs)
extMod = imp.load_module(extModName, fn_, os.path.realpath(path), desc)
if hasattr(extMod, "__autoRun__"):
if callable(extMod.__autoRun__):
extMod.__autoRun__()
except :
extMod = _generate_module(extModName)
return extMod
def _get_pub_func(mod, modName, funcs):
for attr in dir(mod):
if attr.startswith('_'):
continue
if callable(getattr(mod, attr)):
func = getattr(mod, attr)
if isinstance(func, type):
if any(['Error' in func.__name__, 'Exception' in func.__name__]):
continue
funcs['{0}.{1}'.format(modName, attr)] = func
def _load_module(modName, funcs=None, module_dirs=None):
if module_dirs is None:
module_dirs = ["."]
fn_, path, desc = imp.find_module(modName, module_dirs)
modulePath = os.path.realpath(path)
mod = imp.load_module(modName, fn_, modulePath, desc)
if os.path.isdir(modulePath):
extMod = _find_ext_module("ext", [modulePath])
if type(funcs) is dict:
_get_pub_func(mod, modName, funcs)
_get_pub_func(extMod, modName, funcs)
return mod
def call(modName, funName, funcs, arg=list()):
mod = _load_module(modName, funcs)
fun = funcs['{0}.{1}'.format(modName, funName)]
print fun(*arg)
fun = getattr(mod, funName)
return fun(*arg)
if __name__ == "__main__":
funcs = {}
print call("testImp", "test", funcs)
print call("testImp1", "test", funcs)
print call("testImp3", "test", funcs)
print funcs
|
salt |
|
|
Salt官方提供了一个Salt Bootstrap项目用来进行快速安装。
wget -O - http://bootstrap.saltstack.org | sudo sh
查看所有连接上master的主机状态:
salt '*' test.ping
|
zmq |
|
|
ROUTER和DEALER就是XREQ和XREP
|
select poll epoll |
|
|
select是个同步的IO多路复用函数,用来同时检查多个文件描述符的可用性。这个函数的局限性:最多检查1024个文件
poll函数是用来等待多个文件上的事件,其实和select一样,还是多路复用。poll函数比select函数的优点是,检查的文件没有上限了。但是效率还是差不多一样低
epoll机制使用3个函数配合完成epoll_create(2),epoll_ctl(2)和epoll_wait(2)
|
Reactor Proactor |
|
|
Reactor模式和Proactor模式。我们在Linux中使用epoll,当IO可读写的时候通知你,你再去同步读写,这就是所谓Reactor模式。而windows下的iocp或者Linux下的ZeroMQ则是数据发送完了或者接收完了再通知你,这就是所谓Proactor模式。其实说白了就是,Reactor给你的是读写权,Proactor给你的是数据。
|
ZeroMQ |
|
|
与其他的消息队列相比,ZeroMQ有以下一些特点
1.点对点无中间节点。
传统的消息队列都需要一个消息服务器来存储转发消息。而ZeroMQ则放弃了这个模式,把侧重点放在了点对点的消息传输上,并且(试图)做到极致。以为消息服务器最终还是转化为服务器对其他节点的点对点消息传输上。ZeroMQ能缓存消息,但是是在发送端缓存。ZeroMQ里有水位设置的相关接口来控制缓存量。当然,ZeroMQ也支持传统的消息队列(通过zmq_device来实现)。
2.强调消息收发模式。
在点对点的消息传输上ZeroMQ将通信的模式做了归纳,比如常见的订阅模式(一个消息发多个客户),分发模式(N个消息平均分给X个客户)等等。下面是目前支持的消息模式配对,任何一方都可以做为服务端。
PUB and SUB
REQ and REP
REQ and XREP
XREQ and REP
XREQ and XREP
XREQ and XREQ
XREP and XREP
PUSH and PULL
PAIR and PAIR
3.以统一接口支持多种底层通信方式(线程间通信,进程间通信,跨主机通信)。
如果你想把本机多进程的软件放到跨主机的环境里去执行,通常要将IPC接口用套接字重写一遍。非常麻烦。而有了ZeroMQ就方便多了,只要把通信协议从"ipc:///xxx"改为"tcp://*.*.*.*:****"就可以了,其他代码通通不需要改,如果这个是从配置文件里读的话,那么程序就完全不要动了,直接复制到其他机器上就可以了。
4.异步,强调性能。
ZeroMQ设计之初就是为了高性能的消息发送而服务的,所以其设计追求简洁高效。它发送消息是异步模式,通过单独出一个IO线程来实现,所以消息发送调用之后不要立刻释放相关资源哦,会出错的(以为还没发送完),要把资源释放函数交给ZeroMQ让ZeroMQ发完消息自己释放。
|
zmq |
|
|
# -*- coding: utf-8 -*-
'''
Created on 2013-3-14
@author: wave
'''
import threading
import random
import zmq
import time
import pickle
random.seed()
class ThreadManager(object):
def __init__(self):
self.threads = []
def add(self, extThread):
self.threads.append(extThread)
def remove(self, extThread):
self.threads.remove(extThread)
def pullThread(self, func=None, args=None, kwargs=None):
class ExtThread(threading.Thread):
def __init__(self, args=None, kwargs=None):
if args is None:
args = ()
if kwargs is None:
kwargs = {}
threading.Thread.__init__(self, target=func , args=args, kwargs=kwargs)
self.args = args
self.kwargs = kwargs
def run(self):
threadID = self
globalThreadManager.add(threadID)
func(*self.args, **self.kwargs)
globalThreadManager.remove(threadID)
return ExtThread(args, kwargs)
globalThreadManager = ThreadManager()
#
class Server(object):
def __init__(self, servers, serverID, port, subport, interface='tcp://127.0.0.1'):
servers.append(self)
self.serverID = serverID
self.interface = interface
self.port = port
self.subport = subport
self.users = []
def addUser(self, users=None):
if type(users) is list or type(users) is tuple:
self.users.extend(users)
elif isinstance(users, User):
self.users.append(users)
else:
pass
def findUser(self, userID):
for user in self.users:
if user.userID == userID:
return user
def startPubServer(self):
def pubSide():
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("{0}:{1}".format(self.interface, self.subport))
socket.setsockopt(zmq.HWM, 1) # 除了PUB型会在达到高水位丢弃后续数据外,其他类型的都会以阻塞的形式来应对后续数据
socket.setsockopt(zmq.SWAP, 25000000)
time.sleep(1.0)
self.pubSocket = socket
pubThread = threading.Thread(target=pubSide, name="pubThread", args=())
pubThread.start()
time.sleep(0.1)
def startSubServer(self, servers):
self.subSockets = {}
def subSide(serverID, subport):
context = zmq.Context().instance()
socket = context.socket(zmq.SUB)
socket.connect("{0}:{1}".format(self.interface, subport))
socket.setsockopt(zmq.SUBSCRIBE, self.serverID)
self.subSockets[serverID] = socket
while True:
topic, msg = socket.recv_multipart()
self.handleSub(pickle.loads(msg))
for server in servers:
if server != self:
subThread = threading.Thread(target=subSide, name="subThread", args=(server.serverID, server.subport))
subThread.start()
time.sleep(0.1)
def handleSub(self, pyObj):
fromWho = pyObj[0]
toWho = pyObj[1]
time.sleep(0.1)
print "subscribe save ", toWho
def startBServer(self):
def serverSide():
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("{0}:{1}".format(self.interface, self.port))
while True :
req = socket.recv_pyobj()
rep = self.handle(req)
socket.send_pyobj(rep)
serverThread = threading.Thread(target=serverSide, name="serverThread", args=())
serverThread.start()
time.sleep(0.1)
def handle(self, pyObj):
if isinstance(pyObj, AddUser):
newUserID = "{0}{1}".format(self.serverID, len(self.users))
newUser = User(newUserID)
self.users.append(newUser)
return newUser
if isinstance(pyObj, GetAllUsers):
return self.users
fromWho = pyObj[0]
toWho = pyObj[1]
fromUser = self.findUser(fromWho)
if fromUser:
print "save ", fromWho
targetUser = self.findUser(toWho)
if targetUser:
print "save ", toWho
else:
for serverID, _ in self.subSockets.iteritems():
if toWho.startswith(serverID):
self.pubSocket.send_multipart([serverID, pickle.dumps(pyObj, -1)])
return "share successed"
class MsgHead(object):
def __init__(self):
self.routeSeq = [] # 每经过一个Server,routeSeq增加一条信息,防止出现系统内消息死循环
self.lastServer = None # 用于权限校验
self.taskID = None # 用于全流程追踪
self.debugLevel = 0 # 是否记录日志
def validateHead(self):
if len(self.routeSeq) > 10:
print "出现了消息死循环"
class BServer(object):
def __init__(self, port, servers):
self.port = port
self.realSockets = {}
for server in servers:
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("{0}:{1}".format(server.interface, server.port))
self.realSockets[server.serverID] = socket
def handle(self, pyObj):
if isinstance(pyObj, AddUser):
randInt = random.randint(0, len(self.realSockets) - 1)
randomRealSocket = self.realSockets.values()[randInt]
randomRealSocket.send_pyobj(pyObj)
return randomRealSocket.recv_pyobj()
if isinstance(pyObj, GetAllUsers):
for serverID, socket in self.realSockets.iteritems():
if pyObj.serverID == serverID:
socket.send_pyobj(pyObj)
return socket.recv_pyobj()
for serverID, socket in self.realSockets.iteritems():
if pyObj[0].startswith(serverID):
socket.send_pyobj(pyObj)
return socket.recv_pyobj()
def startBServer(self, interface='tcp://127.0.0.1', port=1234):
def serverSide():
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("{0}:{1}".format(interface, port))
while True :
req = socket.recv_pyobj()
rep = self.handle(req)
socket.send_pyobj(rep)
serverThread = threading.Thread(target=serverSide, name="bserverThread", args=())
serverThread.start()
class User(object):
def __init__(self, userID):
self.userID = userID
class Share(object):
def __init__(self, fromWho, toWho, what):
self.fromWho = fromWho
self.toWho = toWho
self.what = what
class AddUser(object):
def __init__(self, userName):
self.userName = userName
class GetAllUsers(object):
def __init__(self, serverID):
self.serverID = serverID
def testClient():
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("{0}:{1}".format(interface, bServer.port))
# client -> bServer -> server 同步
# server <-> server 异步
for _ in range(20):
socket.send_pyobj(AddUser("userA"))
socket.recv_pyobj().userID
socket.send_pyobj(GetAllUsers("a"))
usersInServerA = socket.recv_pyobj()
print [pyobj.userID for pyobj in usersInServerA]
socket.send_pyobj(GetAllUsers("b"))
usersInServerB = socket.recv_pyobj()
print [pyobj.userID for pyobj in usersInServerB]
socket.send_pyobj(GetAllUsers("c"))
usersInServerC = socket.recv_pyobj()
print [pyobj.userID for pyobj in usersInServerC]
socket.send_pyobj([usersInServerA[0].userID, usersInServerB[0].userID, "a book"])
print socket.recv_pyobj()
socket.send_pyobj([usersInServerA[0].userID, usersInServerC[0].userID, "another book"])
print socket.recv_pyobj()
class LogServer(object):
def __init__(self):
pass
def aaa():
time.sleep(0.1)
if __name__ == '__main__':
newThread = globalThreadManager.pullThread(aaa)
print globalThreadManager.threads
newThread.start()
print globalThreadManager.threads
newThread.join()
print globalThreadManager.threads
interface = 'tcp://127.0.0.1'
servers = []
serverA = Server(servers, "a", 1235, 1236)
serverB = Server(servers, "b", 1237, 1238)
serverC = Server(servers, "c", 1239, 1240)
serverA.addUser()
serverB.addUser()
serverC.addUser()
serverA.startPubServer()
serverB.startPubServer()
serverC.startPubServer()
serverA.startSubServer(servers)
serverB.startSubServer(servers)
serverC.startSubServer(servers)
serverA.startBServer()
serverB.startBServer()
serverC.startBServer()
bServer = BServer(1234, servers)
bServer.startBServer()
time.sleep(0.1)
testClient()
|
zmq push pull |
|
|
# -*- coding: utf-8 -*-
'''
Created on 2013-3-15
@author: wave
'''
import zmq
import threading
import time
import pickle
class BWorker(object):
def __init__(self, pullPort, pushPort):
self.pullPort = pullPort
self.pushPort = pushPort
def startServer(self):
def bHandler():
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:{0}".format(self.pullPort))
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:{0}".format(self.pushPort))
while True:
req = receiver.recv()
sender.send(req)
serverThread = threading.Thread(target=bHandler, name="bHandler", args=())
serverThread.start()
class RealWorker(object):
def __init__(self, pullPort):
self.pullPort = pullPort
def startServer(self):
def pullHandler():
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:{0}".format(self.pullPort))
while True:
print pickle.loads(receiver.recv())
serverThread = threading.Thread(target=pullHandler, name="pullHandler", args=())
serverThread.start()
if __name__ == '__main__':
realWorker = RealWorker(5558)
realWorker.startServer()
pushPort = 5557
context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:{0}".format(pushPort))
bWorker = BWorker(pushPort, realWorker.pullPort)
bWorker.startServer()
time.sleep(0.5)
sender.send(pickle.dumps(["abc", "def"], -1))
sender.send(pickle.dumps(["xxx", "yyy"], -1))
|
zmq ser |
|
|
# -*- coding: utf-8 -*-
'''
Created on 2013-3-18
@author: wave
'''
import zlib
import cPickle as pickle
import zmq
class SerializingSocket(zmq.Socket):
def send_zipped_pickle(self, obj, flags=0, protocol= -1):
return self.send(zlib.compress(pickle.dumps(obj, protocol)), flags=flags)
def recv_zipped_pickle(self, flags=0):
return pickle.loads(zlib.decompress(self.recv(flags)))
def send_array(self, A, flags=0, copy=True, track=False):
self.send_json(dict(dtype=str(A.dtype),
shape=A.shape), flags | zmq.SNDMORE)
return self.send(A, flags, copy=copy, track=track)
def recv_array(self, flags=0, copy=True, track=False):
md = self.recv_json(flags=flags)
A = numpy.frombuffer(buffer(self.recv(flags=flags, copy=copy, track=track)),
dtype=md['dtype'])
return A.reshape(md['shape'])
if __name__ == '__main__':
ctx = zmq.Context.instance()
req = SerializingSocket(ctx, zmq.REQ)
rep = SerializingSocket(ctx, zmq.REP)
rep.bind('inproc://a')
req.connect('inproc://a')
A = numpy.ones((1024, 1024))
print "Array is %i bytes" % len(buffer(A))
# send/recv with pickle+zip
req.send_zipped_pickle(A)
B = rep.recv_zipped_pickle()
# now try non-copying version
rep.send_array(A, copy=False)
C = req.recv_array(copy=False)
print ("Checking zipped pickle...")
print ("Okay" if (A == B).all() else "Failed")
print ("Checking send_array...")
print ("Okay" if (C == B).all() else "Failed")
|
zmq heartbeat |
|
|
# -*- coding: utf-8 -*-
'''
Created on 2013-3-18
@author: wave
'''
import time
import zmq
from zmq.eventloop import ioloop, zmqstream
import threading
class HeartBeater(object):
def __init__(self, loop, pingstream, pongstream, period=1000):
self.loop = loop
self.period = period
self.pingstream = pingstream
self.pongstream = pongstream
self.pongstream.on_recv(self.handle_pong)
self.hearts = set()
self.responses = set()
self.lifetime = 0
self.tic = time.time()
self.caller = ioloop.PeriodicCallback(self.beat, period, self.loop)
self.caller.start()
def beat(self):
toc = time.time()
self.lifetime += toc - self.tic
self.tic = toc
print self.lifetime
goodhearts = self.hearts.intersection(self.responses)
map(self.handle_new_heart, self.responses.difference(goodhearts))
map(self.handle_heart_failure, self.hearts.difference(goodhearts))
self.responses = set()
print "%i beating hearts: %s" % (len(self.hearts), self.hearts)
self.pingstream.send(str(self.lifetime))
def handle_new_heart(self, heart):
print "yay, got new heart %s!" % heart
self.hearts.add(heart)
def handle_heart_failure(self, heart):
print "Heart %s failed :(" % heart
self.hearts.remove(heart)
def handle_pong(self, msg):
"if heart is beating"
if msg[1] == str(self.lifetime):
self.responses.add(msg[0])
else:
print "got bad heartbeat (possibly old?): %s" % msg[1]
def heartBeatMonitor():
loop = ioloop.IOLoop()
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind('tcp://127.0.0.1:5555')
router = context.socket(zmq.ROUTER)
router.bind('tcp://127.0.0.1:5556')
HeartBeater(loop, zmqstream.ZMQStream(pub, loop), zmqstream.ZMQStream(router, loop))
loop.start()
def beat():
dev = zmq.devices.ThreadDevice(zmq.FORWARDER, zmq.SUB, zmq.DEALER)
dev.setsockopt_in(zmq.SUBSCRIBE, "")
dev.connect_in('tcp://127.0.0.1:5555')
dev.connect_out('tcp://127.0.0.1:5556')
dev.start()
if __name__ == '__main__':
thread = threading.Thread(target=heartBeatMonitor, args=())
thread.start()
time.sleep(0.2)
for i in range(10):
thread = threading.Thread(target=beat, args=())
thread.start()
|
zmq device |
|
|
# -*- coding: utf-8 -*-
'''
Created on 2013-3-18
@author: wave
'''
import zmq
import os
import threading
import time
def server():
print 'Server', os.getpid()
def routine(context):
socket = context.socket(zmq.REP)
socket.connect("inproc://workers")
while True:
message = socket.recv()
time.sleep(1)
socket.send(message)
context = zmq.Context()
workers = context.socket(zmq.DEALER)
workers.bind("inproc://workers");
clients = context.socket(zmq.DEALER)
clients.bind('tcp://127.0.0.1:5555')
for i in range(10):
thread = threading.Thread(target=routine, args=(context,))
thread.start()
# ZMQ_QUEUE
# starts a queue device
# ZMQ_FORWARDER
# starts a forwarder device
# ZMQ_STREAMER
# starts a streamer device
zmq.device(zmq.QUEUE, clients, workers)
print "Finished"
def client():
print 'Client', os.getpid()
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:5555')
while True:
data = zmq.Message(str(os.getpid()))
start = time.time()
socket.send(data)
data = socket.recv()
print time.time() - start, data
if __name__ == '__main__':
thread = threading.Thread(target=server, args=())
thread.start()
time.sleep(0.1)
thread = threading.Thread(target=client, args=())
thread.start()
|
python demo share zmq |
|
|
# -*- coding: utf-8 -*-
'''
Created on 2013-3-14
@author: wave
'''
import threading
import random
import zmq
import time
import pickle
random.seed()
class ThreadManager(object):
def __init__(self):
self.threads = []
def add(self, extThread):
self.threads.append(extThread)
def remove(self, extThread):
self.threads.remove(extThread)
def pullThread(self, func=None, args=None, kwargs=None):
class ExtThread(threading.Thread):
def __init__(self, args=None, kwargs=None):
if args is None:
args = ()
if kwargs is None:
kwargs = {}
threading.Thread.__init__(self, target=func , args=args, kwargs=kwargs)
self.args = args
self.kwargs = kwargs
def run(self):
threadID = self
globalThreadManager.add(threadID)
func(*self.args, **self.kwargs)
globalThreadManager.remove(threadID)
return ExtThread(args, kwargs)
globalThreadManager = ThreadManager()
#
class Server(object):
def __init__(self, servers, serverID, port, subport, interface='tcp://127.0.0.1'):
servers.append(self)
self.serverID = serverID
self.interface = interface
self.port = port
self.subport = subport
self.users = []
def addUser(self, users=None):
if type(users) is list or type(users) is tuple:
self.users.extend(users)
elif isinstance(users, User):
self.users.append(users)
else:
pass
def findUser(self, userID):
for user in self.users:
if user.userID == userID:
return user
def startPubServer(self):
def pubSide():
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("{0}:{1}".format(self.interface, self.subport))
socket.setsockopt(zmq.HWM, 1) # 除了PUB型会在达到高水位丢弃后续数据外,其他类型的都会以阻塞的形式来应对后续数据
socket.setsockopt(zmq.SWAP, 25000000)
time.sleep(1.0)
self.pubSocket = socket
pubThread = threading.Thread(target=pubSide, name="pubThread", args=())
pubThread.start()
time.sleep(0.1)
def startSubServer(self, servers):
self.subSockets = {}
def subSide(serverID, subport):
context = zmq.Context().instance()
socket = context.socket(zmq.SUB)
socket.connect("{0}:{1}".format(self.interface, subport))
socket.setsockopt(zmq.SUBSCRIBE, self.serverID)
self.subSockets[serverID] = socket
while True:
topic, msg = socket.recv_multipart()
self.handleSub(pickle.loads(msg))
for server in servers:
if server != self:
subThread = threading.Thread(target=subSide, name="subThread", args=(server.serverID, server.subport))
subThread.start()
time.sleep(0.1)
def handleSub(self, pyObj):
fromWho = pyObj[0]
toWho = pyObj[1]
time.sleep(0.1)
print "subscribe save ", toWho
def startBServer(self):
def serverSide():
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("{0}:{1}".format(self.interface, self.port))
while True :
req = socket.recv_pyobj()
rep = self.handle(req)
socket.send_pyobj(rep)
serverThread = threading.Thread(target=serverSide, name="serverThread", args=())
serverThread.start()
time.sleep(0.1)
def handle(self, pyObj):
if isinstance(pyObj, AddUser):
newUserID = "{0}{1}".format(self.serverID, len(self.users))
newUser = User(newUserID)
self.users.append(newUser)
return newUser
if isinstance(pyObj, GetAllUsers):
return self.users
fromWho = pyObj[0]
toWho = pyObj[1]
fromUser = self.findUser(fromWho)
if fromUser:
print "save ", fromWho
targetUser = self.findUser(toWho)
if targetUser:
print "save ", toWho
else:
for serverID, _ in self.subSockets.iteritems():
if toWho.startswith(serverID):
self.pubSocket.send_multipart([serverID, pickle.dumps(pyObj, -1)])
return "share successed"
class MsgHead(object):
def __init__(self):
self.routeSeq = [] # 每经过一个Server,routeSeq增加一条信息,防止出现系统内消息死循环
self.lastServer = None # 用于权限校验
self.taskID = None # 用于全流程追踪
self.debugLevel = 0 # 是否记录日志
def validateHead(self):
if len(self.routeSeq) > 10:
print "出现了消息死循环"
class BServer(object):
def __init__(self, port, servers):
self.port = port
self.realSockets = {}
for server in servers:
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("{0}:{1}".format(server.interface, server.port))
self.realSockets[server.serverID] = socket
def handle(self, pyObj):
if isinstance(pyObj, AddUser):
randInt = random.randint(0, len(self.realSockets) - 1)
randomRealSocket = self.realSockets.values()[randInt]
randomRealSocket.send_pyobj(pyObj)
return randomRealSocket.recv_pyobj()
if isinstance(pyObj, GetAllUsers):
for serverID, socket in self.realSockets.iteritems():
if pyObj.serverID == serverID:
socket.send_pyobj(pyObj)
return socket.recv_pyobj()
for serverID, socket in self.realSockets.iteritems():
if pyObj[0].startswith(serverID):
socket.send_pyobj(pyObj)
return socket.recv_pyobj()
def startBServer(self, interface='tcp://127.0.0.1', port=1234):
def serverSide():
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("{0}:{1}".format(interface, port))
while True :
req = socket.recv_pyobj()
rep = self.handle(req)
socket.send_pyobj(rep)
serverThread = threading.Thread(target=serverSide, name="bserverThread", args=())
serverThread.start()
class User(object):
def __init__(self, userID):
self.userID = userID
class Share(object):
def __init__(self, fromWho, toWho, what):
self.fromWho = fromWho
self.toWho = toWho
self.what = what
class AddUser(object):
def __init__(self, userName):
self.userName = userName
class GetAllUsers(object):
def __init__(self, serverID):
self.serverID = serverID
def testClient():
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("{0}:{1}".format(interface, bServer.port))
# client -> bServer -> server 同步
# server <-> server 异步
for _ in range(20):
socket.send_pyobj(AddUser("userA"))
socket.recv_pyobj().userID
socket.send_pyobj(GetAllUsers("a"))
usersInServerA = socket.recv_pyobj()
print [pyobj.userID for pyobj in usersInServerA]
socket.send_pyobj(GetAllUsers("b"))
usersInServerB = socket.recv_pyobj()
print [pyobj.userID for pyobj in usersInServerB]
socket.send_pyobj(GetAllUsers("c"))
usersInServerC = socket.recv_pyobj()
print [pyobj.userID for pyobj in usersInServerC]
socket.send_pyobj([usersInServerA[0].userID, usersInServerB[0].userID, "a book"])
print socket.recv_pyobj()
socket.send_pyobj([usersInServerA[0].userID, usersInServerC[0].userID, "another book"])
print socket.recv_pyobj()
def aaa():
time.sleep(0.1)
if __name__ == '__main__':
newThread = globalThreadManager.pullThread(aaa)
print globalThreadManager.threads
newThread.start()
print globalThreadManager.threads
newThread.join()
print globalThreadManager.threads
interface = 'tcp://127.0.0.1'
servers = []
serverA = Server(servers, "a", 1235, 1236)
serverB = Server(servers, "b", 1237, 1238)
serverC = Server(servers, "c", 1239, 1240)
serverA.addUser()
serverB.addUser()
serverC.addUser()
serverA.startPubServer()
serverB.startPubServer()
serverC.startPubServer()
serverA.startSubServer(servers)
serverB.startSubServer(servers)
serverC.startSubServer(servers)
serverA.startBServer()
serverB.startBServer()
serverC.startBServer()
bServer = BServer(1234, servers)
bServer.startBServer()
time.sleep(0.1)
testClient()
|
python zmq |
|
|
import zmq
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, "10001")
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
while True:
socks = dict(poller.poll())
if receiver in socks and socks[receiver] == zmq.POLLIN:
message = receiver.recv()
if subscriber in socks and socks[subscriber] == zmq.POLLIN:
message = subscriber.recv()
|
python isportopen |
|
|
import socket
from string import ascii_letters, digits
def sanitize_host(host):
return ''.join([c for c in host[0:255] if c in (ascii_letters + digits + '.-')])
def isportopen(host, port):
if not (1 <= int(port) <= 65535):
return False
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
return 0 == sock.connect_ex((sanitize_host(host), int(port)))
if __name__ == "__main__":
print isportopen("----------", 50170)
|
python update dic |
|
|
import collections
def update(d, u):
for k, v in u.iteritems():
if isinstance(v, collections.Mapping):
r = update(d.get(k, {}), v)
d[k] = r
else:
d[k] = u[k]
return d
if __name__ == "__main__":
a = {}
a["a"] = {}
a["a"]["b"] = "xxxmyu"
a["b"] = {"b":"qwerty"}
b = {}
b["a"] = {}
b["a"]["b"] = "xxxmyu"
b["a"]["c"] = "asdas"
b["c"] = {"c":"ccccc"}
update(a, b)
print a
|
需求只来自你对用户的了解。不来自调研、分析、讨论或竞争对手 |
|
|
|
python version |
|
|
__version_info__ = (0, 11, 1)
__version__ = '.'.join(map(str, __version_info__))
print __version__
def versions_report():
libs = (
("Jinja2", "jinja2", "__version__"),
("M2Crypto", "M2Crypto", "version"),
("msgpack-python", "msgpack", "version"),
("msgpack-pure", "msgpack_pure", "version"),
("pycrypto", "Crypto", "__version__"),
("PyYAML", "yaml", "__version__"),
("PyZMQ", "zmq", "__version__"),
)
padding = len(max([lib[0] for lib in libs], key=len)) + 1
fmt = '{0:>{pad}}: {1}'
yield fmt.format("Salt", __version__, pad=padding)
yield fmt.format(
"Python", sys.version.rsplit('\n')[0].strip(), pad=padding
)
for name, imp, attr in libs:
try:
imp = __import__(imp)
version = getattr(imp, attr)
if not isinstance(version, basestring):
version = '.'.join(map(str, version))
yield fmt.format(name, version, pad=padding)
except ImportError:
yield fmt.format(name, "not installed", pad=padding)
if __name__ == '__main__':
for x in versions_report():
print x
|
python 常见函数 |
|
|
1.常用内置函数:(不用import就可以直接使用)
help(obj) 在线帮助, obj可是任何类型
callable(obj) 查看一个obj是不是可以像函数一样调用
repr(obj) 得到obj的表示字符串,可以利用这个字符串eval重建该对象的一个拷贝
eval_r(str) 表示合法的python表达式,返回这个表达式
dir(obj) 查看obj的name space中可见的name
hasattr(obj,name) 查看一个obj的name space中是否有name
getattr(obj,name) 得到一个obj的name space中的一个name
setattr(obj,name,value) 为一个obj的name space中的一个name指向vale这个object
delattr(obj,name) 从obj的name space中删除一个name
vars(obj) 返回一个object的name space。用dictionary表示
locals() 返回一个局部name space,用dictionary表示
globals() 返回一个全局name space,用dictionary表示
type(obj) 查看一个obj的类型
isinstance(obj,cls) 查看obj是不是cls的instance
issubclass(subcls,supcls) 查看subcls是不是supcls的子类
类型转换函数
chr(i) 把一个ASCII数值,变成字符
ord(i) 把一个字符或者unicode字符,变成ASCII数值
oct(x) 把整数x变成八进制表示的字符串
hex(x) 把整数x变成十六进制表示的字符串
str(obj) 得到obj的字符串描述
list(seq) 把一个sequence转换成一个list
tuple(seq) 把一个sequence转换成一个tuple
dict(),dict(list) 转换成一个dictionary
int(x) 转换成一个integer
long(x) 转换成一个long interger
float(x) 转换成一个浮点数
complex(x) 转换成复数
max(...) 求最大值
min(...) 求最小值
用于执行程序的内置函数
complie 如果一段代码经常要使用,那么先编译,再运行会更快。
|
python包 |
|
|
distribute是setuptools的取代
distribute是对标准库disutils模块的增强
disutils主要是用来更加容易的打包和分发包,特别是对其他的包有依赖的包
curl -0 http://python-distribute.org/distribute_setup.py
python distribute_setup.py
pip是easy_install的取代
curl -0 https://raw.github.com/pypa/pip/master/contrib/get-pip.py
python get-pip.py
wget http://pypi.python.org/packages/source/p/pip/pip-0.7.2.tar.gz
tar xzf pip-0.7.2.tar.gz
cd pip-0.7.2
python setup.py install
Jinja2
Jinja是基于python的模板引擎
http://pypi.python.org/pypi/Jinja2
msgpack-python
MessagePack是一个基于二进制高效的对象序列化Library用于跨语言通信
M2Crypto
pycrypto
PyYAML
pyzmq >= 2.1.9
消息队列zerozmq的Python 封装
追求性能为主的消息队列实现,全部数据在内存中保存
如果担心数据持久化的问题,可以考虑RabbitMQ 等类似方案
Kestrel
Pyflakes
PyChecker
PyLint
Python代码语法检查工具
Durus
Python对象数据库,也可作为一种对象实例持久化的机制
开源的纯Python实现,并提供一个可选的C语言插件来大幅提高运行效率
Gluttony
Python模块之间依赖关系图自动生成工具
Heapy
Dowser
对Python程序进行内存占用剖析的模块
Coverage.py
Python测试代码覆盖率统计工具,内置于nose
pyinotify
利用操作系统自身提供的Notify机制以最高的效率监控文件变化
cairo
一个支持多种输出的向量图形库
|
Linux 修改时区tzselect |
|
|
tzselect
date -s 08/14/12
date -s 20:59:59
|
Lua |
|
|
lua -i -la -lb
dofile("lib1.lua")
lua -e "print(math.sin(12))"
lua -i -e "_PROMPT=' lua> '"
local path = "/usr/local/lua/lib/libluasocket.so"
local f = assert(loadlib(path, "luaopen_socket"))
f() -- actually open the library
|