`
ssdutliuhaibo
  • 浏览: 32747 次
  • 性别: Icon_minigender_1
  • 来自: 南京
最近访客 更多访客>>
社区版块
存档分类
最新评论
收藏列表
标题 标签 来源
python demo tree eTag
# -*- coding: utf-8 -*-
'''
Created on 2013-3-29
@author: wave
'''
import md5
import time

class IDGenerator(object):
    maxID = 0
    
    @classmethod
    def genID(cls, moduleID="0000", agentID="0000", treeId="0000"):
        cls.maxID += 1
        return int(moduleID + agentID + treeId + time.strftime("%Y%m%d%H%M%S", time.localtime()) + ("%04d" % cls.maxID))
    
class ETagGenerator(object):
    maxID = 0
    
    @classmethod
    def genETag(cls, moduleID="0000", agentID="0000", treeId="0000"):
        cls.maxID += 1
        return int(moduleID + agentID + treeId + time.strftime("%Y%m%d%H%M%S", time.localtime()) + ("%04d" % cls.maxID))

class Node(object):
    def __init__(self, nodeID, parentNodeID, data, eTag): # 
        self.parentNodeID = parentNodeID
        self.nodeID = nodeID
        self.data = data        
        self.eTag = eTag

    def __str__(self):
        return "{0}.{1}".format(self.data, self.eTag)

class Leaf(object):
    def __init__(self, leafID, parentNodeID, data, dataSize, eTag): # 
        self.parentNodeID = parentNodeID
        self.leafID = leafID
        self.data = data
        self.dataSize = dataSize 
        self.eTag = eTag

    def __str__(self):
        return "{0}.{1}".format(self.data, self.eTag)
    
class Tree(object):
    def __init__(self):
        self.rootNode = None
        
        self.currentNodes = {}
        self.currentLeafs = {}
        
        self.leafData = {}
        self.leafMap = {}
        
    def _genNewNode(self, newNodeData, parentNodeID):
        nodeID = IDGenerator.genID()
        newNode = Node(nodeID, parentNodeID, newNodeData, ETagGenerator.genETag())
        
        self.currentNodes[nodeID] = newNode
        
        return newNode
    
    def _genNewLeaf(self, leafInfo, parentNodeID):
        # 
        if "fileDigest" in leafInfo.keys():
            md5key = leafInfo["fileDigest"].upper()
            if not md5key in self.leafData.keys():
                raise Exception("file not exists in tree")
            
            leafID = IDGenerator.genID()
            newLeaf = Leaf(leafID, parentNodeID, leafInfo["fileName"], len(self.leafData[md5key]), ETagGenerator.genETag())
            self.currentLeafs[leafID] = newLeaf
            
            self.leafMap[leafID] = md5key
        else:
            fileData = leafInfo["fileData"]
            md5key = md5.new(fileData).hexdigest().upper()
            if not md5key in self.leafData.keys():
                self.leafData[md5key] = fileData
                
            leafID = IDGenerator.genID()
            newLeaf = Leaf(leafID, parentNodeID, leafInfo["fileName"], len(fileData), ETagGenerator.genETag())
            self.currentLeafs[leafID] = newLeaf
            
            self.leafMap[leafID] = md5key
        
        return newLeaf
    
    def _getNode(self, nodeID):
        if nodeID in self.currentNodes.keys():
            return self.currentNodes[nodeID]
        raise ValueError("no node found, {0}".format(nodeID))
    
    def _getLeaf(self, leafID):
        if leafID in self.currentLeafs:
            return self.currentLeafs[leafID]
        raise ValueError("no leaf found, {0}".format(leafID))
    
    def _getSubNodes(self, nodeID):
        subNodes = []
        for _, node in self.currentNodes.iteritems():
            if (node.parentNodeID == nodeID):
                subNodes.append(node)
        return subNodes
    
    def _getSubLeafs(self, nodeID):
        subLeafs = []
        for _, leaf in self.currentLeafs.iteritems():
            if (leaf.parentNodeID == nodeID):
                subLeafs.append(leaf)
        return subLeafs
    
    def addNode(self, newNodeData, targetNodeID=None):
        if self.rootNode is None: # 未创建根目录
            if targetNodeID is None:
                newNode = self._genNewNode(newNodeData, None)
                
                self.rootNode = newNode
                return newNode.nodeID
            else: # 根目录不存在的情况下,不允许增加
                return
        else:
            if targetNodeID is None: # 默认是根目录
                newNode = self._genNewNode(newNodeData, self.rootNode.nodeID)
            else:                
                parentNode = self._getNode(targetNodeID)
                newNode = self._genNewNode(newNodeData, parentNode.nodeID)

            return newNode.nodeID
        
    def addLeaf(self, leafData, targetNodeID):
        parentNode = self._getNode(targetNodeID)
        newLeaf = self._genNewLeaf(leafData, parentNode.nodeID)

        node = self._getNode(targetNodeID)
        node.eTag = ETagGenerator.genETag()

        return newLeaf.leafID
    
    def updateNode(self, nodeID, newNodeData):
        node = self._getNode(nodeID)
        node.data = newNodeData
        node.eTag = ETagGenerator.genETag()
    
    def updateLeaf(self, leafID, newLeafInfo):
        leaf = self._getLeaf(leafID)
        leaf.eTag = ETagGenerator.genETag()
        
        if "fileName" in newLeafInfo:
            leaf.data = newLeafInfo["fileName"]
        
        if "fileData" in newLeafInfo:
            fileData = newLeafInfo["fileData"]
            
            md5key = md5.new(fileData).hexdigest().upper()
            if not md5key in self.leafData.keys():
                self.leafData[md5key] = fileData
            self.leafMap[leafID] = md5key

        node = self._getNode(leaf.parentNodeID)
        node.eTag = ETagGenerator.genETag()

    def gcLeafData(self):
        for md5Key, fileData in self.leafData.iteritems():
            if not md5Key in self.leafMap.values():
                print "%s is no more refered by any leaf, can be delete. fileData is %s" % (md5Key, fileData)
    
    def deepCopyNodes(self, sourceNodeIDs, targetNodeID):
        def getSourceTree(node):
            class SourceNode(object):
                def __init__(self, nodeID, nodeName):
                    self.nodeID = nodeID
                    self.nodeName = nodeName
                    self.totalSize = 0
                    self.subNodes = []
                
                def __str__(self):
                    return "{0}.{1}".format(self.nodeName, self.nodeID)
    
            def getSourceTree(node, leafs):
                sourceNode = SourceNode(node.nodeID, node.data)
                
                subLeafs = self._getSubLeafs(node.nodeID)
                if subLeafs:
                    subLeafs.sort(cmp=lambda x, y : cmp(x.dataSize, y.dataSize))
                    leafs[node.nodeID] = [leaf.leafID for leaf in subLeafs]
                    sourceNode.totalSize = sum([leaf.dataSize for leaf in subLeafs])
                
                subNodes = self._getSubNodes(sourceNode.nodeID)
                for subNode in subNodes:
                    sourceNode.subNodes.append(getSourceTree(subNode, leafs))
                    
                return sourceNode
            
            leafs = {}
            rootSourceNode = getSourceTree(node, leafs)
            
            return rootSourceNode, leafs
        
        def copyLeafs(leafs, targetNodeID):
            for leafID in leafs:
                self._genNewLeaf({"fileName":self._getLeaf(leafID).data,
                                  "fileDigest":self.leafMap[leafID]},
                                 targetNodeID)
        
        def deepCopy(sourceNode, targetNode, leafs):
            newNode = self._genNewNode(sourceNode.nodeName, targetNode.nodeID)
            
            if sourceNode.nodeID in leafs.keys():
                copyLeafs(leafs[sourceNode.nodeID], newNode.nodeID)
            
            subNodes = sourceNode.subNodes
            subNodes.sort(cmp=lambda x, y : cmp(x.totalSize, y.totalSize))
            for subNode in subNodes:                
                deepCopy(subNode, newNode, leafs)
            
            newNode.eTag = ETagGenerator.genETag()
            
        targetNode = self._getNode(targetNodeID)
        for sourceNodeID in sourceNodeIDs:
            sourceNode = self._getNode(sourceNodeID)
            rootSourceNode, leafs = getSourceTree(sourceNode)
            
            deepCopy(rootSourceNode, targetNode, leafs)
            
    def levelCopyNodes(self, sourceNodeIDs, targetNodeID): # 不考虑空间
        def getSourceTree(sourceNode):
            class SourceNode(object):
                def __init__(self, nodeID, parentNodeID, nodeName):
                    self.nodeID = nodeID
                    self.parentNodeID = parentNodeID
                    self.nodeName = nodeName
                
                def __str__(self):
                    return "{0}.{1}".format(self.nodeName, self.nodeID)
    
            def getSourceNodeByLevel(sourceNodeList, leafs):
                nodesCurrentLevel = sourceNodeList[-1]
                nodesNextLevel = []
                for souceNode in nodesCurrentLevel:
                    subLeafs = self._getSubLeafs(souceNode.nodeID)
                    for subLeaf in subLeafs:
                        leafs[subLeaf.leafID] = souceNode.nodeID
                    
                    subNodes = self._getSubNodes(souceNode.nodeID)
                    for subNode in subNodes:
                        nodesNextLevel.append(SourceNode(subNode.nodeID, subNode.parentNodeID, subNode.data))
                    
                if nodesNextLevel:
                    sourceNodeList.append(nodesNextLevel)
                    getSourceNodeByLevel(sourceNodeList, leafs)
            
            leafs = {}
            sourceNodeList = []
            sourceNodeList.append([SourceNode(sourceNode.nodeID, None, sourceNode.data)])
            
            getSourceNodeByLevel(sourceNodeList, leafs)
            
            return sourceNodeList, leafs

        targetNode = self._getNode(targetNodeID)
        for sourceNodeID in sourceNodeIDs:
            sourceNode = self._getNode(sourceNodeID)
            
            sourceNodeList, leafs = getSourceTree(sourceNode)
            
            nodeMap = {}
            for sourceNodes in sourceNodeList:
                for sourceNode in sourceNodes:
                    if sourceNodeList.index(sourceNodes) == 0:
                        newNode = self._genNewNode(sourceNode.nodeName, targetNode.nodeID)
                        nodeMap[sourceNode.nodeID] = newNode
                    else:
                        destNode = nodeMap[sourceNode.parentNodeID]
                        newNode = self._genNewNode(sourceNode.nodeName, destNode.nodeID)
                        nodeMap[sourceNode.nodeID] = newNode
            
            newLeafs = []
            for sourceLeafID, sourceParentNodeID in leafs.iteritems():
                newLeaf = self._genNewLeaf({"fileName":self._getLeaf(sourceLeafID).data,
                                            "fileDigest":self.leafMap[sourceLeafID]},
                                           nodeMap[sourceParentNodeID].nodeID)
                newLeafs.append(newLeaf)
                
            for node in nodeMap.values():
                node.eTag = ETagGenerator.genETag()
                
    def printTree(self):
        def printNode(nodeID, depth=0):
            node = self._getNode(nodeID)
            print "|" + "-" * depth + str(node)
            
            subLeafs = self._getSubLeafs(nodeID)
            for subLeaf in subLeafs:
                print "|" + "-" * (depth + 1) + str(subLeaf) + "   --> " + self.leafMap[subLeaf.leafID]
                
            subNodes = self._getSubNodes(nodeID)
            for subNode in subNodes:
                printNode(subNode.nodeID, depth + 1)
                
        printNode(self.rootNode.nodeID, 1)
    
    def syncTree(self, syncToken=None, pageSize=2): #
        if syncToken is None:
            syncToken = 0

        def syncNode(node, updatedNodes):
            if node.eTag > syncToken:
                updatedNodes.append(node)
                
            subNodes = self._getSubNodes(node.nodeID)
            for subNode in subNodes:
                syncNode(subNode, updatedNodes)
        
        def syncLeaf(node, updatedLeafs):
            for leaf in self._getSubLeafs(node.nodeID):
                if leaf.eTag > syncToken:
                    updatedLeafs.append(leaf)
                
        updatedNodes = []
        syncNode(self.rootNode, updatedNodes)
        
        updatedNodes.sort(cmp=lambda x, y : cmp(y.eTag, x.eTag))
        updatedNodes = updatedNodes[-pageSize:]
        
        updatedLeafs = []
        for node in updatedNodes:
            syncLeaf(node, updatedLeafs)
        
        updatedLeafs.sort(cmp=lambda x, y : cmp(y.eTag, x.eTag))
        updatedLeafs = updatedLeafs[-pageSize:]
        
        #
        # 关于分页的可靠性:
        # 由于nodes的eTag最大的总是会比leafs中的最大eTag还要大
        # 所以如果leafs总是会由于eTag先被查出来??
        
        for node in updatedNodes:
            print node
        for leaf in updatedLeafs:
            print leaf
        return updatedNodes, updatedLeafs
        
if __name__ == '__main__': # 不考虑version, 只考虑更新ETag
    tree = Tree()
    root = tree.addNode("root")
    folder1 = tree.addNode("folder1", root)
    folder2 = tree.addNode("folder2", root)
    folder3 = tree.addNode("folder3", root)
    folder11 = tree.addNode("folder11", folder1)
    folder12 = tree.addNode("folder12", folder1)
    folder21 = tree.addNode("folder21", folder2)
    folder22 = tree.addNode("folder22", folder2)

    eTag = 0
    while True:
        nodes, leafs = tree.syncTree(eTag)
        if not nodes:
            break;
        eTag = nodes[0].eTag
        if leafs and leafs[0].eTag:
            eTag = leafs[0].eTag
    print
    
    # 更新节点
    tree.updateNode(folder22, "folder22 new Node data")
    tree.updateNode(folder12, "folder12 new Node data")
    
    while True:
        nodes, leafs = tree.syncTree(eTag)
        if not nodes:
            break;
        eTag = nodes[0].eTag
        if leafs and leafs[0].eTag:
            eTag = leafs[0].eTag
    print
    
    
    #
    folder13 = tree.addNode("folder13", folder1)
    leaf1 = tree.addLeaf({"fileName":"file1", "fileData":"1"*32}, folder1)
    leaf2 = tree.addLeaf({"fileName":"file2", "fileData":"2"*32}, folder1)
    leaf3 = tree.addLeaf({"fileName":"file3", "fileData":"1"*32}, folder1)
    leaf4 = tree.addLeaf({"fileName":"file4", "fileDigest":md5.new("2"*32).hexdigest().upper()}, folder1)

#    tree.updateLeaf(leaf1, {"fileData":"2"*32})
    tree.updateLeaf(leaf3, {"fileData":"2"*32})
    tree.gcLeafData()
    
    while True:
        nodes, leafs = tree.syncTree(eTag) # TODO 如果nodes和leafs长度都不满,说明结束了,可以少查一次
        if not nodes:
            break;
        eTag = nodes[0].eTag
        if leafs and leafs[0].eTag: # 总是以文件中的最大的eTag为目标eTag
            eTag = leafs[0].eTag
    print
    
#    tree.levelCopyNodes([root], folder11)
#    while True:
#        nodes, leafs = tree.syncTree(eTag)
#        if not nodes:
#            break;
#        eTag = nodes[0].eTag
#        if leafs and leafs[0].eTag:
#            eTag = leafs[0].eTag
#    print
    
#    tree.deepCopyNodes([root], folder12)
#    while True:
#        nodes, leafs = tree.syncTree(eTag)
#        if not nodes:
#            break;
#        eTag = nodes[0].eTag
#        if leafs and leafs[0].eTag:
#            eTag = leafs[0].eTag
#    print
    tree.printTree()
lxml xml to dict
from lxml import etree

def recursive_dict(element):
#    res = []
#    for item in element: # element is iterable
#        res.append(recursive_dict(item))
    res = map(recursive_dict, element)
    
    theDict = {}
    for k, v in res:
        if k in theDict.keys():
            if type(theDict[k]) is list:
                theDict[k].append(v)
            else:
                theDict[k] = [theDict[k], v]
        else:
            theDict[k] = v
    
    return element.tag, theDict or element.text

def xml2dict(xmlStr):
    return recursive_dict(etree.fromstring(xmlStr))[1]

if __name__ == "__main__":
    xmlStr = """<root>
    <a>aValue</a>
    <b>
        <item>item1</item>
        <item>item2</item>
        <item>item2</item>
    </b>
    <c>
        <item1>
            <iitem>xxx</iitem>
        </item1>
        <item2>item2</item2>
    </c>
    </root>"""
    
    print xml2dict(xmlStr)
python chroot
# -*- coding: utf-8 -*-  
'''
Created on 2013-3-25
@author: wave
'''
import os
import subprocess
import shutil

def which(eFile=None):
    if eFile:
        if os.access(eFile, os.X_OK):
            return eFile

        default_path = "/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin"
        for path in os.environ.get('PATH', default_path).split(os.pathsep):
            full_path = os.path.join(path, eFile)
            if os.access(full_path, os.X_OK):
                return full_path
    return None

def _run(cmd, cwd=None,
         stdout=subprocess.PIPE,
         stderr=subprocess.PIPE,
         env=(), rstrip=True, retcode=False
         ):
    if not cwd:
        cwd = os.path.expanduser('~')
        if not os.access(cwd, os.R_OK):
            cwd = '/'

    run_env = os.environ
    run_env.update(env)
    kwargs = {'cwd': cwd,
              'shell': True,
              'env': run_env,
              'stdout': stdout,
              'stderr': stderr}

    kwargs['executable'] = os.environ.get('SHELL', '/bin/sh')
    kwargs['close_fds'] = True

    if retcode:
        kwargs['stdout'] = None
        kwargs['stderr'] = None

    proc = subprocess.Popen(cmd, **kwargs)
    out, err = proc.communicate()

    if rstrip:
        if out is not None:
            out = out.rstrip()
        if err is not None:
            err = err.rstrip()

    ret = {}
    ret['stdout'] = out
    ret['stderr'] = err
    ret['pid'] = proc.pid
    ret['retcode'] = proc.returncode
    return ret

def ldd(eFile):
    output = _run(('ldd %s' % eFile), stderr=subprocess.STDOUT)['stdout']

    result = []
    for line in output.split("\n"):
        line = line.strip()
        
        sepIndex = line.find("=>")
        if sepIndex != -1:
            left = line[:sepIndex].strip()
            right = line[sepIndex + 2:].strip()
            
            result.append((left[left.rfind("lib"):], right[:right.find(" ")]))
        else:
            result.append(line[:line.find(" ")])
    return result

def copy2Target(targetRootPath, refFiles):
    def cp(sourceFile, targetPath):
        print "copy %s to %s" % (sourceFile, targetPath)
        if not os.path.exists(sourceFile):
            print "%s is not exist" % sourceFile 
            return
        
        if not os.path.exists(targetPath):
            os.makedirs(targetPath)
        
        shutil.copy2(sourceFile, targetPath)
           
    def copyFile(targetRootPath, refFile):
        if refFile.startswith("/usr/lib64/"):
            cp(refFile, targetRootPath + "/lib64")
        elif refFile.startswith("/usr/lib/"):
            cp(refFile, targetRootPath + "/lib")
        elif refFile.startswith("/lib64/"):
            cp(refFile, targetRootPath + "/lib64")
        elif refFile.startswith("/lib/"):
            cp(refFile, targetRootPath + "/lib")
        elif refFile.startswith("lib"):
            if os.path.exists("/lib/" + refFile):
                cp ("/lib/" + refFile, targetRootPath + "/lib")
            elif os.path.exists("/usr/lib/" + refFile):
                cp ("/usr/lib/" + refFile, targetRootPath + "/usr/lib")
        elif refFile.startswith("/usr/bin/"):
            cp(refFile, targetRootPath + "/usr/bin")
        elif refFile.startswith("/bin/"):
            cp(refFile, targetRootPath + "/bin")
        else:
            cp(refFile, targetRootPath + "/home")
        
    for refFile in refFiles:
        if type(refFile) is tuple or type(refFile) is list:
            copyFile(targetRootPath, refFile[0])
            copyFile(targetRootPath, refFile[1])
        else:
            copyFile(targetRootPath, refFile)

def makeRoot(targetRootPath="."):
    for command in ["csh"]:
        fullPath = which(command)
        refFiles = ldd(fullPath)
        print refFiles
        copy2Target(targetRootPath, refFiles)
        
makeRoot()
python tail call optimize
# -*- coding: utf-8 -*-
'''
Created on 2013-3-26
@author: wave
'''
import sys

class TailRecurseException:
    def __init__(self, args, kwargs):
        self.args = args
        self.kwargs = kwargs

def tail_call_optimized(g):
    def func(*args, **kwargs):
        f = sys._getframe()
        if f.f_back and f.f_back.f_back and f.f_back.f_back.f_code == f.f_code:
            raise TailRecurseException(args, kwargs)
        else:
            while 1:
                try:
                    return g(*args, **kwargs)
                except TailRecurseException, e:
                    args = e.args
                    kwargs = e.kwargs
    func.__doc__ = g.__doc__
    return func

@tail_call_optimized
def factorial(n, acc=1):
    if n == 0:
        return acc
    return factorial(n - 1, n * acc)

print factorial(10000)

@tail_call_optimized
def fib(i, currentV=0, nextV=1):
    if i == 0:
        return currentV
    else:
        return fib(i - 1, nextV, currentV + nextV)

print fib(10000)


if __name__ == '__main__':
    pass
AOP in python
from contextlib import contextmanager

@contextmanager
def tag(name):
    print "<%s>" % name
    yield
    print "" % name

with tag("h1"):
    print "foo"



class test:
     def __enter__(self):
          print("enter")
          return 1

     def __exit__(self, *args):
          print("exit")

def function():
     print 'function()'
     return 111

with test() as t:
     function()
     print 't is', t

demoTree
# -*- coding: utf-8 -*-
'''
Created on 2013-3-21
@author: wave
'''
import random
import time
class IDGenerator(object):
    maxID = 0
    def __init__(self):
        pass
    def genID(cls):
        cls.maxID += 1
        return cls.maxID
    genID = classmethod(genID)

class Node(object):
    def __init__(self, nodeID, version, data):
        self.parentNode = None
        self.subNodes = {}
        self.leafs = []
        
        self.nodeID = nodeID
        self.version = version
        self.data = data

    def __str__(self):
        return "{0}.v{1}".format(self.data, self.version)
    
class Leaf(object):
    def __init__(self, data):
        pass
    
class Tree(object):
    def __init__(self, treeName="trunck"):
        self.treeName = treeName
        
        self.rootNode = None
        self.version = 0
        
        self.snapshots = []
        self.currentNodes = {}
    
        self.tags = []
    
    def _getVersionNode(self, nodeID, version=None):
        if nodeID in self.currentNodes.keys():
            if version:
                for versionNode in self.currentNodes[nodeID]:
                    if versionNode.version == version:
                        return versionNode
            else:
                return self.currentNodes[nodeID][0]
        
        raise ValueError("no node found, {0}.v{1}".format(nodeID, version))
    
    def _genNewNode(self, newNodeData, nodeID=None):
        if nodeID is None:
            nodeID = IDGenerator.genID()
        
        newNode = Node(nodeID, self.version, newNodeData)

        if newNode.nodeID in self.currentNodes.keys(): 
            self.currentNodes[newNode.nodeID].insert(0, newNode)
        else:
            self.currentNodes[newNode.nodeID] = [newNode]
            
        return newNode
        
    def addNode(self, newNodeData, targetNodeID=None, printTime=False):
        if self.rootNode is None: # 未创建根目录
            if targetNodeID is None:
                newNode = self._genNewNode(newNodeData)
                
                self.rootNode = newNode
                return newNode.nodeID
            else: # 根目录不存在的情况下,不允许增加
                return
        else:
            if not printTime:
                if targetNodeID is None: # 默认是根目录
                    parentNode = self.rootNode
                else:                
                    parentNode = self._getVersionNode(targetNodeID)
                
            else:
                st11 = time.time()
                
                if targetNodeID is None: # 默认是根目录
                    parentNode = self.rootNode
                else:                
                    parentNode = self._getVersionNode(targetNodeID)
                
                st12 = time.time()
                print "  time cost {0}".format(st12 - st11)
                
            
            
            if not printTime:
                newNode = self._genNewNode(newNodeData)
            else:
                st21 = time.time()
                newNode = self._genNewNode(newNodeData)
                st22 = time.time()
                print "  time cost {0}".format(st22 - st21)
                
                                        
            # 向上查找所有修改version的节点
            nodesUpdated = {}
            self._updateParents(parentNode, nodesUpdated)
            
            # 
            if parentNode in nodesUpdated.keys():
                newNode.parentNode = nodesUpdated[parentNode]
            else: # 不在nodesUpdated中的话,就在self.nodes中
                newNode.parentNode = parentNode
 
            newNode.parentNode.subNodes[newNode.nodeID] = newNode
                
            return newNode.nodeID
    
    def _updateParents(self, node, nodesUpdated):
        while node and (node.version != self.version):
            updatedNode = self._genNewNode(node.data, node.nodeID)
            
            nodesUpdated[node] = updatedNode
            
            node = node.parentNode
        
        # 调整指针
        for node, updatedNode in nodesUpdated.iteritems():
            if node.parentNode in nodesUpdated.keys():
                updatedNode.parentNode = nodesUpdated[node.parentNode]
            else:
                updatedNode.parentNode = node.parentNode

        for node, updatedNode in nodesUpdated.iteritems():
            for _, subNode in node.subNodes.iteritems():
                if subNode in nodesUpdated.keys():
                    updatedNode.subNodes[nodesUpdated[subNode].nodeID] = nodesUpdated[subNode]
                    subNode.parnetNode = updatedNode
                else:
                    updatedNode.subNodes[subNode.nodeID] = subNode
                    subNode.parnetNode = updatedNode # 将为改变的节点的parentNode修改为新的节点
                    
        if self.rootNode in nodesUpdated.keys():
            self.rootNode = nodesUpdated[self.rootNode]
    
    def update(self, targetNodeID, newNodeData):
        if self.rootNode is None: # 未创建根目录
            return
        else:
            targetNode = self._getVersionNode(targetNodeID)
            
            if targetNode.version == self.version: # 节点当前就是更新状态
                targetNode.data = newNodeData
            else:
                newNode = self._genNewNode(newNodeData, targetNode.nodeID)
                
                # 向上查找所有修改version的节点
                nodesUpdated = {}
                self._updateParents(targetNode.parentNode, nodesUpdated)
                
                #
                if targetNode.parentNode in nodesUpdated.keys():
                    newNode.parentNode = nodesUpdated[targetNode.parentNode]
                else: # 不在nodesUpdated中的话,就在self.nodes中
                    newNode.parentNode = targetNode.parentNode
                
                newNode.parentNode.subNodes[newNode.nodeID] = newNode
                
                # 由于下面的节点的parentNode改变了,如何处理?
                for _, subNode in targetNode.subNodes.iteritems():
                    newNode.subNodes[subNode.nodeID] = subNode
                    subNode.parentNode = newNode
                
                
                return newNode.nodeID
    
    def deleteNode(self, targetNodeID):
        if self.rootNode is None: # 未创建根目录
            return
        else:
            targetNode = self._getVersionNode(targetNodeID)
            
            # 创建一个没有自身节点的
            # 向上查找所有修改version的节点
            nodesUpdated = {}
            self._updateParents(targetNode.parentNode, nodesUpdated)
            
            # 新的父节点中去掉到自身的subNodes记录
            if targetNode.parentNode in nodesUpdated.keys():
                del nodesUpdated[targetNode.parentNode].subNodes[targetNode.nodeID]
            else: # 不在nodesUpdated中的话,就在self.nodes中
                del targetNode.parentNode.subNodes[targetNode.nodeID]
                
    def _fixParentNode(self): # 由于所有节点的父目录都是指向最新节点的,需要修复
        pass
        
    def snapshot(self):
        nodes = []
        for _, versionNodes in self.currentNodes.iteritems():
            if versionNodes[0].version == self.version:
                nodes.append(versionNodes[0])
        if len(nodes) == 0:
            return

        self.snapshots.insert(0, self.rootNode)
        self.version += 1
    
    def resvert(self, version=None):
        self.tag()
        
        if version:
            toDels = []
            for snap in self.snapshots:
                if snap.version == version:
                    self.rootNode = snap
                    self.version = snap.version
                elif snap.version > version:
                    toDels.append(snap)

            for toDel in toDels:
                self.snapshots.remove(toDel)
                    
            for _, versionNodes in self.currentNodes.iteritems():
                toDels = []
                for versionNode in versionNodes:
                    if versionNode.version > version:
                        toDels.append(versionNode)
                        
                for toDel in toDels:
                    versionNodes.remove(toDel)
        else:
            # version == self.version的节点删除
            for _, versionNodes in self.currentNodes.iteritems():
                if versionNodes[0].version == self.version: # 只会在0位置
                    versionNodes.remove(versionNode)
    
    def tag(self): # 或许和svn的tag意思不一样
        class Tag(object):
            def __init__(self, tagName, rootNode, snapshots, currentNodes):
                self.tagName = tagName
                self.rootNode = rootNode
                self.snapshots = snapshots
                self.currentNodes = currentNodes
        
        tagCurrentNodes = {} # tag是对currentNodes和snapshots备份
        for nodeID, versionNodes in self.currentNodes.iteritems():
            tagCurrentNodes[nodeID] = versionNodes

        tagSnapshots = []
        for snap in self.snapshots:
            tagSnapshots.append(snap)
        
        self.tags.append(Tag("tag{0}".format(len(self.tags)), self.rootNode, tagSnapshots, tagCurrentNodes))
        
    def branch(self):
        branch = Tree("branch")
        self._copyTree(branch, self)
        return branch
    
    def _copyTree(self, targetTree, originalTree):
        newRootNode = None
        nodeMap = {}
        for nodeID, versionNodes in originalTree.currentNodes.iteritems():
            nodes = []
            for versionNode in versionNodes:
                branchNode = Node(versionNode.nodeID, versionNode.version, versionNode.data)
                nodes.append(branchNode)
                
                nodeMap[versionNode] = branchNode
                
                if versionNode is originalTree.rootNode:
                    newRootNode = branchNode
            targetTree.currentNodes[nodeID] = nodes
        
        for _, versionNodes in originalTree.currentNodes.iteritems():
            for versionNode in versionNodes:
                for nodeID, sub in versionNode.subNodes.iteritems():
                    nodeMap[versionNode].subNodes[nodeID] = nodeMap[sub]
                
        for snap in originalTree.snapshots:
            targetTree.snapshots.append(nodeMap[snap])
        
        if newRootNode is None:
            raise ValueError("rootNode not found")
        targetTree.rootNode = newRootNode
        targetTree.version = originalTree.version
    
    def searchByData(self, condition):
        results = []
        for _, versionNodes in self.currentNodes.iteritems():
            for versionNode in versionNodes:
                if versionNode.data.upper().find(condition.upper()) != -1:
                    results.append(versionNode.data)
        
        return results             
        
    def printTree(self, version=None):
        def printNode(node, depth=0):
            print "|" + "-" * depth + str(node)
            for _, subNode in node.subNodes.iteritems():
                printNode(subNode, depth + 1)
        
        print self.treeName,
        if version is None or self.version == version:
            print "version:", self.version
            printNode(self.rootNode, 1)

        for snap in self.snapshots:
            if version is None or snap.version == version:
                print "snap version:", snap.version
                printNode(snap, 1)
        print
    
# version Tree
# copy on write

if __name__ == '__main__':
    tree = Tree()
    root = tree.addNode("root")
    folder1 = tree.addNode("folder1", root)
    folder2 = tree.addNode("folder2", root)
    folder11 = tree.addNode("folder11", folder1)
    folder12 = tree.addNode("folder12", folder1)
    folder21 = tree.addNode("folder21", folder2)
    folder22 = tree.addNode("folder22", folder2)
    tree.printTree()
    
    tree.snapshot()
    tree.snapshot()
    folder221 = tree.addNode("folder221", folder22)
    folder222 = tree.addNode("folder222", folder22)
    tree.printTree() # v1
    
    tree.snapshot()
    tree.snapshot()
    tree.update(folder22, "folder22_new")
    tree.update(folder222, "folder222_new")
    tree.printTree() # v2
    
    branch = tree.branch()
    
    tree.snapshot()
    tree.snapshot()
    tree.deleteNode(folder22) # copy on write体现在该节点的父节点会备份
    tree.printTree() # v3
    
    tree.snapshot()
    tree.snapshot()
    tree.resvert(1) # revert是危险操作,会导致节点删除,为了保证数据的永不删除,revert之前一定要先tag
    tree.printTree()
    
    branch.printTree()

    st = time.time()
    random.seed()
    nodeIDs = []
    newNodeID = tree.addNode("node_", folder1)
    nodeIDs.append(newNodeID)
    for i in range(100000):
        if i % 1000 == 0:
            st1 = time.time()
            
            targetNode = nodeIDs[random.randint(0, len(nodeIDs) - 1)]
            newNodeID = tree.addNode("node".format(i), targetNode, printTime=True)
            nodeIDs.append(newNodeID)

            st2 = time.time()
            print "time cost {0}".format(st2 - st1)
        else:
            targetNode = nodeIDs[random.randint(0, len(nodeIDs) - 1)]
            newNodeID = tree.addNode("node".format(i), targetNode)
            nodeIDs.append(newNodeID)

    st1 = time.time()
    nodeID = targetNode = nodeIDs[random.randint(0, len(nodeIDs) - 1)]
    ss = nodeID in tree.currentNodes.keys()
    st2 = time.time()
    print "time cost {0}".format(st2 - st1)
    
    print tree.searchByData("222")
    print "No Endless Cycle"
    print "time cost {0}".format(time.time() - st) # 10000 到 100000 的差异很大
    
python list_files
def list_files(directory):
    ret = set()
    ret.add(directory)
    for root, dirs, files in os.walk(directory):
        for name in files:
            ret.add(os.path.join(root, name))
        for name in dirs:
            ret.add(os.path.join(root, name))

    return list(ret)
    
python __metaclass__
import os
class classFactory(type):
    def __new__(cls, name, base, dictory):
        return type.__new__(cls, name, base, dictory)
    
    def __init__(cls, name, base, dictory):
        super(classFactory, cls).__init__(name, base, dictory)
    
    def __getattr__(cls, attribute):
        print attribute
    
    def method(cls, *args, **kwargs):
        print args, kwargs
        
Car = classFactory("Car", (object,), {"run": lambda self:"Car running"})

class Bus(Car):
    __metaclass__ = classFactory
    
    def __init__(self, *args):
        pass
    
    def run(self):
        return "Bus running"
    
if __name__ == "__main__":
    bus = Bus()
    print bus.run()
    print bus.ddd
python meta_path
class xmlImport(object):
    def find_module(self, fullname, path=None):
        return self
    
    def load_module(self, fullname):
        if fullname in sys.modules:
            return sys.modules[fullname]
        
        mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
        mod.__load__ = self
        mod.__file___ = "<%s>" % fullname
        if "." not in fullname:
            mod.__path__ = []
            mod.__package__ = fullname
        else:
            mod.__package__ = fullname.rpartition(".")[0]
        
        exec "value = 1" in mod.__dict__
        return mod

import imp
import sys
sys.meta_path.append(xmlImport())

import os12
print os12.value
python load module
import imp
import sys
import os

#old_imp = __buildins__.__import__
#def new_imp(module_name, globals=None, locals=None, fromlist=[]):
#    print module_name
#    return old_imp(module_name, globals, locals, fromlist)
#__buildins__.__import__ = new_imp

def _compile():
    strCode = "for i in range(0,10): print i"
    code = compile(strCode, '', 'exec')
    exec code

def _generate_module(modName):
    if modName in sys.modules:
        return sys.modules[modName]

    code = "'''ext module loaded {0} parent module'''".format(modName)
    mod = imp.new_module(modName)
    exec code in mod.__dict__
    sys.modules[modName] = mod
    return mod

def _find_ext_module(extModName, module_dirs):
    try:
        fn_, path, desc = imp.find_module(extModName, module_dirs)
        extMod = imp.load_module(extModName, fn_, os.path.realpath(path), desc)
        
        if hasattr(extMod, "__autoRun__"):
            if callable(extMod.__autoRun__):
                extMod.__autoRun__()
    except :
        extMod = _generate_module(extModName)
    return extMod

def _get_pub_func(mod, modName, funcs):
    for attr in dir(mod):
        if attr.startswith('_'):
            continue
    
        if callable(getattr(mod, attr)):
            func = getattr(mod, attr)
            if isinstance(func, type):
                if any(['Error' in func.__name__, 'Exception' in func.__name__]):
                    continue
            
            funcs['{0}.{1}'.format(modName, attr)] = func

def _load_module(modName, funcs=None, module_dirs=None):
    if module_dirs is None:
        module_dirs = ["."]
        
    fn_, path, desc = imp.find_module(modName, module_dirs)
    modulePath = os.path.realpath(path)
    mod = imp.load_module(modName, fn_, modulePath, desc)
    
    if os.path.isdir(modulePath):
        extMod = _find_ext_module("ext", [modulePath])
        
    if type(funcs) is dict: 
        _get_pub_func(mod, modName, funcs)
        _get_pub_func(extMod, modName, funcs)
    
    return mod

def call(modName, funName, funcs, arg=list()):
    mod = _load_module(modName, funcs)
    
    fun = funcs['{0}.{1}'.format(modName, funName)]
    print fun(*arg)
    
    fun = getattr(mod, funName)
    return fun(*arg)

if __name__ == "__main__":
    funcs = {}
    print call("testImp", "test", funcs)
    print call("testImp1", "test", funcs)
    print call("testImp3", "test", funcs)
    print funcs
salt
Salt官方提供了一个Salt Bootstrap项目用来进行快速安装。

wget -O - http://bootstrap.saltstack.org | sudo sh 

查看所有连接上master的主机状态:

salt '*' test.ping 
zmq
ROUTER和DEALER就是XREQ和XREP
select poll epoll
select是个同步的IO多路复用函数,用来同时检查多个文件描述符的可用性。这个函数的局限性:最多检查1024个文件

poll函数是用来等待多个文件上的事件,其实和select一样,还是多路复用。poll函数比select函数的优点是,检查的文件没有上限了。但是效率还是差不多一样低

epoll机制使用3个函数配合完成epoll_create(2),epoll_ctl(2)和epoll_wait(2)
Reactor Proactor
Reactor模式和Proactor模式。我们在Linux中使用epoll,当IO可读写的时候通知你,你再去同步读写,这就是所谓Reactor模式。而windows下的iocp或者Linux下的ZeroMQ则是数据发送完了或者接收完了再通知你,这就是所谓Proactor模式。其实说白了就是,Reactor给你的是读写权,Proactor给你的是数据。
ZeroMQ
与其他的消息队列相比,ZeroMQ有以下一些特点
1.点对点无中间节点。
传统的消息队列都需要一个消息服务器来存储转发消息。而ZeroMQ则放弃了这个模式,把侧重点放在了点对点的消息传输上,并且(试图)做到极致。以为消息服务器最终还是转化为服务器对其他节点的点对点消息传输上。ZeroMQ能缓存消息,但是是在发送端缓存。ZeroMQ里有水位设置的相关接口来控制缓存量。当然,ZeroMQ也支持传统的消息队列(通过zmq_device来实现)。

2.强调消息收发模式。
在点对点的消息传输上ZeroMQ将通信的模式做了归纳,比如常见的订阅模式(一个消息发多个客户),分发模式(N个消息平均分给X个客户)等等。下面是目前支持的消息模式配对,任何一方都可以做为服务端。
PUB and SUB
REQ and REP
REQ and XREP
XREQ and REP
XREQ and XREP
XREQ and XREQ
XREP and XREP
PUSH and PULL
PAIR and PAIR

3.以统一接口支持多种底层通信方式(线程间通信,进程间通信,跨主机通信)。
如果你想把本机多进程的软件放到跨主机的环境里去执行,通常要将IPC接口用套接字重写一遍。非常麻烦。而有了ZeroMQ就方便多了,只要把通信协议从"ipc:///xxx"改为"tcp://*.*.*.*:****"就可以了,其他代码通通不需要改,如果这个是从配置文件里读的话,那么程序就完全不要动了,直接复制到其他机器上就可以了。

4.异步,强调性能。
ZeroMQ设计之初就是为了高性能的消息发送而服务的,所以其设计追求简洁高效。它发送消息是异步模式,通过单独出一个IO线程来实现,所以消息发送调用之后不要立刻释放相关资源哦,会出错的(以为还没发送完),要把资源释放函数交给ZeroMQ让ZeroMQ发完消息自己释放。

zmq
# -*- coding: utf-8 -*-
'''
Created on 2013-3-14
@author: wave
'''
import threading
import random
import zmq
import time
import pickle

random.seed()

class ThreadManager(object):
    def __init__(self):
        self.threads = []
    
    def add(self, extThread):
        self.threads.append(extThread)
        
    def remove(self, extThread):
        self.threads.remove(extThread)
        
    def pullThread(self, func=None, args=None, kwargs=None):
        class ExtThread(threading.Thread):
            def __init__(self, args=None, kwargs=None):
                
                if args is None:
                    args = ()
                if kwargs is None:
                    kwargs = {}
                threading.Thread.__init__(self, target=func , args=args, kwargs=kwargs)
                
                self.args = args
                self.kwargs = kwargs
                
            def run(self):
                threadID = self
                globalThreadManager.add(threadID)
                func(*self.args, **self.kwargs)
                globalThreadManager.remove(threadID)
                
        return ExtThread(args, kwargs)
        
globalThreadManager = ThreadManager()

#
      
class Server(object):
    def __init__(self, servers, serverID, port, subport, interface='tcp://127.0.0.1'):
        servers.append(self)
        
        self.serverID = serverID
        self.interface = interface
        self.port = port
        self.subport = subport
        
        self.users = []
    
    def addUser(self, users=None):
        if type(users) is list or type(users) is tuple:
            self.users.extend(users)
        elif isinstance(users, User):
            self.users.append(users)
        else:
            pass
        
    def findUser(self, userID):
        for user in self.users:
            if user.userID == userID:
                return user

    def startPubServer(self):
        def pubSide():
            context = zmq.Context()
            socket = context.socket(zmq.PUB)
            socket.bind("{0}:{1}".format(self.interface, self.subport))

            socket.setsockopt(zmq.HWM, 1) # 除了PUB型会在达到高水位丢弃后续数据外,其他类型的都会以阻塞的形式来应对后续数据
            socket.setsockopt(zmq.SWAP, 25000000)   

            time.sleep(1.0)   
            
            self.pubSocket = socket
                
        pubThread = threading.Thread(target=pubSide, name="pubThread", args=())
        pubThread.start()
        
        time.sleep(0.1)

    def startSubServer(self, servers):
        self.subSockets = {}
        def subSide(serverID, subport):
            context = zmq.Context().instance()
            socket = context.socket(zmq.SUB)
            socket.connect("{0}:{1}".format(self.interface, subport))
            
            socket.setsockopt(zmq.SUBSCRIBE, self.serverID)   
            
            self.subSockets[serverID] = socket
            
            while True:
                topic, msg = socket.recv_multipart()
                self.handleSub(pickle.loads(msg))
        
        for server in servers:
            if server != self:
                subThread = threading.Thread(target=subSide, name="subThread", args=(server.serverID, server.subport))
                subThread.start()
        time.sleep(0.1)
    
    def handleSub(self, pyObj):
        fromWho = pyObj[0]
        toWho = pyObj[1]
        
        time.sleep(0.1)
        print "subscribe save ", toWho 
        
        
    def startBServer(self):
        def serverSide():
            context = zmq.Context()
            socket = context.socket(zmq.REP)
            socket.bind("{0}:{1}".format(self.interface, self.port))
            
            while True :
                req = socket.recv_pyobj()
                rep = self.handle(req)
                socket.send_pyobj(rep)
                
        serverThread = threading.Thread(target=serverSide, name="serverThread", args=())
        serverThread.start()
        
        time.sleep(0.1)
   
    def handle(self, pyObj):
        if isinstance(pyObj, AddUser):
            newUserID = "{0}{1}".format(self.serverID, len(self.users))
            newUser = User(newUserID)
            self.users.append(newUser)
            return newUser
        if isinstance(pyObj, GetAllUsers):  
            return self.users
        
        fromWho = pyObj[0]
        toWho = pyObj[1]
        
        fromUser = self.findUser(fromWho)
        if fromUser:
            print "save ", fromWho
            
            targetUser = self.findUser(toWho)
            if targetUser:
                print "save ", toWho
            else:
                for serverID, _ in self.subSockets.iteritems():
                    if toWho.startswith(serverID):
                        self.pubSocket.send_multipart([serverID, pickle.dumps(pyObj, -1)])
        return "share successed"

class MsgHead(object):
    def __init__(self):
        self.routeSeq = [] # 每经过一个Server,routeSeq增加一条信息,防止出现系统内消息死循环
        self.lastServer = None # 用于权限校验
        self.taskID = None # 用于全流程追踪
        self.debugLevel = 0 # 是否记录日志
        
    def validateHead(self):
        if len(self.routeSeq) > 10:
            print "出现了消息死循环"

class BServer(object):
    def __init__(self, port, servers):
        self.port = port
        self.realSockets = {}
        for server in servers:
            context = zmq.Context()
            socket = context.socket(zmq.REQ)
            socket.connect("{0}:{1}".format(server.interface, server.port))
            
            self.realSockets[server.serverID] = socket
            
    def handle(self, pyObj):
        if isinstance(pyObj, AddUser):
            randInt = random.randint(0, len(self.realSockets) - 1)
            randomRealSocket = self.realSockets.values()[randInt]
            randomRealSocket.send_pyobj(pyObj)
            return randomRealSocket.recv_pyobj()
        
        if isinstance(pyObj, GetAllUsers):        
            for serverID, socket in self.realSockets.iteritems():
                if pyObj.serverID == serverID:
                    socket.send_pyobj(pyObj)
                    return socket.recv_pyobj()
            
        for serverID, socket in self.realSockets.iteritems():
            if pyObj[0].startswith(serverID):
                socket.send_pyobj(pyObj)
                return socket.recv_pyobj()
        
    def startBServer(self, interface='tcp://127.0.0.1', port=1234):
        def serverSide():
            context = zmq.Context()
            socket = context.socket(zmq.REP)
            socket.bind("{0}:{1}".format(interface, port))
            
            while True :
                req = socket.recv_pyobj()
                rep = self.handle(req)
                socket.send_pyobj(rep)
                
        serverThread = threading.Thread(target=serverSide, name="bserverThread", args=())
        serverThread.start()
        
class User(object):
    def __init__(self, userID):
        self.userID = userID

class Share(object):
    def __init__(self, fromWho, toWho, what):
        self.fromWho = fromWho
        self.toWho = toWho
        self.what = what
        
class AddUser(object):
    def __init__(self, userName):
        self.userName = userName

class GetAllUsers(object):
    def __init__(self, serverID):
        self.serverID = serverID

def testClient():
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("{0}:{1}".format(interface, bServer.port))
    
    # client -> bServer -> server 同步
    # server <-> server 异步
    
    for _ in range(20):
        socket.send_pyobj(AddUser("userA"))
        socket.recv_pyobj().userID
    
    socket.send_pyobj(GetAllUsers("a"))
    usersInServerA = socket.recv_pyobj()
    print [pyobj.userID for pyobj in usersInServerA]
    socket.send_pyobj(GetAllUsers("b"))
    usersInServerB = socket.recv_pyobj()
    print [pyobj.userID for pyobj in usersInServerB]
    socket.send_pyobj(GetAllUsers("c"))
    usersInServerC = socket.recv_pyobj()
    print [pyobj.userID for pyobj in usersInServerC]
    
    socket.send_pyobj([usersInServerA[0].userID, usersInServerB[0].userID, "a book"])
    print socket.recv_pyobj()
    
    socket.send_pyobj([usersInServerA[0].userID, usersInServerC[0].userID, "another book"])
    print socket.recv_pyobj()
    
class LogServer(object):
    def __init__(self):
        pass
    

def aaa():
    time.sleep(0.1)
if __name__ == '__main__':
    newThread = globalThreadManager.pullThread(aaa)
    print globalThreadManager.threads
    newThread.start()
    print globalThreadManager.threads
    newThread.join()
    print globalThreadManager.threads
    
    
    interface = 'tcp://127.0.0.1'
    
    servers = []
    
    serverA = Server(servers, "a", 1235, 1236)
    serverB = Server(servers, "b", 1237, 1238)
    serverC = Server(servers, "c", 1239, 1240)
    
    serverA.addUser()
    serverB.addUser()
    serverC.addUser()
    
    serverA.startPubServer()
    serverB.startPubServer()
    serverC.startPubServer()
    
    serverA.startSubServer(servers)
    serverB.startSubServer(servers)
    serverC.startSubServer(servers)
    
    serverA.startBServer()
    serverB.startBServer()
    serverC.startBServer()

    bServer = BServer(1234, servers)
    bServer.startBServer()

    time.sleep(0.1)
        
    testClient()
    
zmq push pull
# -*- coding: utf-8 -*-
'''
Created on 2013-3-15
@author: wave
'''

import zmq
import threading
import time
import pickle

class BWorker(object):
    def __init__(self, pullPort, pushPort):
        self.pullPort = pullPort
        self.pushPort = pushPort
        
    def startServer(self):
        def bHandler():
            context = zmq.Context()
            
            receiver = context.socket(zmq.PULL)
            receiver.connect("tcp://localhost:{0}".format(self.pullPort))
        
            sender = context.socket(zmq.PUSH)
            sender.connect("tcp://localhost:{0}".format(self.pushPort))
            
            while True:
                req = receiver.recv()                
                sender.send(req)
        
        serverThread = threading.Thread(target=bHandler, name="bHandler", args=())
        serverThread.start()

class RealWorker(object):
    def __init__(self, pullPort):
        self.pullPort = pullPort
    
    def startServer(self):
        def pullHandler():
            context = zmq.Context()
            receiver = context.socket(zmq.PULL)
            receiver.bind("tcp://*:{0}".format(self.pullPort))
            
            while True:
                print pickle.loads(receiver.recv())
        
        serverThread = threading.Thread(target=pullHandler, name="pullHandler", args=())
        serverThread.start()

if __name__ == '__main__':
    realWorker = RealWorker(5558)
    realWorker.startServer()

    pushPort = 5557
    context = zmq.Context()
    sender = context.socket(zmq.PUSH)
    sender.bind("tcp://*:{0}".format(pushPort))
    
    bWorker = BWorker(pushPort, realWorker.pullPort)
    bWorker.startServer()

    time.sleep(0.5)
    sender.send(pickle.dumps(["abc", "def"], -1))
    sender.send(pickle.dumps(["xxx", "yyy"], -1))
zmq ser
# -*- coding: utf-8 -*-
'''
Created on 2013-3-18
@author: wave
'''
import zlib
import cPickle as pickle

import zmq

class SerializingSocket(zmq.Socket):
    def send_zipped_pickle(self, obj, flags=0, protocol= -1):
        return self.send(zlib.compress(pickle.dumps(obj, protocol)), flags=flags)
    
    def recv_zipped_pickle(self, flags=0):
        return pickle.loads(zlib.decompress(self.recv(flags)))

    def send_array(self, A, flags=0, copy=True, track=False):
        self.send_json(dict(dtype=str(A.dtype),
                            shape=A.shape), flags | zmq.SNDMORE)
        return self.send(A, flags, copy=copy, track=track)

    def recv_array(self, flags=0, copy=True, track=False):
        md = self.recv_json(flags=flags)
        A = numpy.frombuffer(buffer(self.recv(flags=flags, copy=copy, track=track)),
                             dtype=md['dtype'])
        return A.reshape(md['shape'])

if __name__ == '__main__':
    ctx = zmq.Context.instance()
    req = SerializingSocket(ctx, zmq.REQ)
    rep = SerializingSocket(ctx, zmq.REP)
    
    rep.bind('inproc://a')
    req.connect('inproc://a')
    
    A = numpy.ones((1024, 1024))
    print "Array is %i bytes" % len(buffer(A))
    
    # send/recv with pickle+zip
    req.send_zipped_pickle(A)
    B = rep.recv_zipped_pickle()

    # now try non-copying version
    rep.send_array(A, copy=False)
    C = req.recv_array(copy=False)
    
    print ("Checking zipped pickle...")
    print ("Okay" if (A == B).all() else "Failed")
    print ("Checking send_array...")
    print ("Okay" if (C == B).all() else "Failed")
zmq heartbeat
# -*- coding: utf-8 -*-
'''
Created on 2013-3-18
@author: wave
'''
import time
import zmq
from zmq.eventloop import ioloop, zmqstream
import threading

class HeartBeater(object):
    def __init__(self, loop, pingstream, pongstream, period=1000):
        self.loop = loop
        self.period = period
        
        self.pingstream = pingstream
        self.pongstream = pongstream
        self.pongstream.on_recv(self.handle_pong)
        
        self.hearts = set()
        self.responses = set()
        self.lifetime = 0
        self.tic = time.time()
        
        self.caller = ioloop.PeriodicCallback(self.beat, period, self.loop)
        self.caller.start()
    
    def beat(self):
        toc = time.time()
        self.lifetime += toc - self.tic
        self.tic = toc
        print self.lifetime
        
        goodhearts = self.hearts.intersection(self.responses)
        map(self.handle_new_heart, self.responses.difference(goodhearts))
        map(self.handle_heart_failure, self.hearts.difference(goodhearts))
        
        self.responses = set()
        print "%i beating hearts: %s" % (len(self.hearts), self.hearts)
        self.pingstream.send(str(self.lifetime))
    
    def handle_new_heart(self, heart):
        print "yay, got new heart %s!" % heart
        self.hearts.add(heart)
    
    def handle_heart_failure(self, heart):
        print "Heart %s failed :(" % heart
        self.hearts.remove(heart)
        
    
    def handle_pong(self, msg):
        "if heart is beating"
        if msg[1] == str(self.lifetime):
            self.responses.add(msg[0])
        else:
            print "got bad heartbeat (possibly old?): %s" % msg[1]

def heartBeatMonitor():
    loop = ioloop.IOLoop()
    
    context = zmq.Context()
    pub = context.socket(zmq.PUB)
    pub.bind('tcp://127.0.0.1:5555')
    
    router = context.socket(zmq.ROUTER)
    router.bind('tcp://127.0.0.1:5556')
    
    HeartBeater(loop, zmqstream.ZMQStream(pub, loop), zmqstream.ZMQStream(router, loop))
    
    loop.start()

def beat():
    dev = zmq.devices.ThreadDevice(zmq.FORWARDER, zmq.SUB, zmq.DEALER)
    dev.setsockopt_in(zmq.SUBSCRIBE, "")
    dev.connect_in('tcp://127.0.0.1:5555')
    dev.connect_out('tcp://127.0.0.1:5556')
    dev.start()
    
if __name__ == '__main__':
    thread = threading.Thread(target=heartBeatMonitor, args=())
    thread.start()
    
    time.sleep(0.2)
    
    for i in range(10):
        thread = threading.Thread(target=beat, args=())
        thread.start()
    
zmq device
# -*- coding: utf-8 -*-
'''
Created on 2013-3-18
@author: wave
'''
import zmq
import os
import threading
import time

def server():
    print 'Server', os.getpid()
    
    def routine(context):
        socket = context.socket(zmq.REP)
    
        socket.connect("inproc://workers")
    
        while True:
            message = socket.recv()
            time.sleep(1)
            socket.send(message)
    
    context = zmq.Context()
    
    workers = context.socket(zmq.DEALER)
    workers.bind("inproc://workers");
    
    clients = context.socket(zmq.DEALER)
    clients.bind('tcp://127.0.0.1:5555')
    
    for i in range(10):
        thread = threading.Thread(target=routine, args=(context,))
        thread.start()
    
    # ZMQ_QUEUE
    #    starts a queue device
    # ZMQ_FORWARDER
    #    starts a forwarder device
    # ZMQ_STREAMER
    #    starts a streamer device
    zmq.device(zmq.QUEUE, clients, workers)
    
    print "Finished"
    
def client():
    print 'Client', os.getpid()
    
    context = zmq.Context()
    
    socket = context.socket(zmq.REQ)
    socket.connect('tcp://127.0.0.1:5555')
    
    while True:
        data = zmq.Message(str(os.getpid()))
        start = time.time()
        socket.send(data)
        data = socket.recv()
        print time.time() - start, data

if __name__ == '__main__':
    thread = threading.Thread(target=server, args=())
    thread.start()
    
    time.sleep(0.1)
    
    thread = threading.Thread(target=client, args=())
    thread.start()
python demo share zmq
# -*- coding: utf-8 -*-
'''
Created on 2013-3-14
@author: wave
'''
import threading
import random
import zmq
import time
import pickle

random.seed()

class ThreadManager(object):
    def __init__(self):
        self.threads = []
    
    def add(self, extThread):
        self.threads.append(extThread)
        
    def remove(self, extThread):
        self.threads.remove(extThread)
        
    def pullThread(self, func=None, args=None, kwargs=None):
        class ExtThread(threading.Thread):
            def __init__(self, args=None, kwargs=None):
                
                if args is None:
                    args = ()
                if kwargs is None:
                    kwargs = {}
                threading.Thread.__init__(self, target=func , args=args, kwargs=kwargs)
                
                self.args = args
                self.kwargs = kwargs
                
            def run(self):
                threadID = self
                globalThreadManager.add(threadID)
                func(*self.args, **self.kwargs)
                globalThreadManager.remove(threadID)
                
        return ExtThread(args, kwargs)
        
globalThreadManager = ThreadManager()

#
      
class Server(object):
    def __init__(self, servers, serverID, port, subport, interface='tcp://127.0.0.1'):
        servers.append(self)
        
        self.serverID = serverID
        self.interface = interface
        self.port = port
        self.subport = subport
        
        self.users = []
    
    def addUser(self, users=None):
        if type(users) is list or type(users) is tuple:
            self.users.extend(users)
        elif isinstance(users, User):
            self.users.append(users)
        else:
            pass
        
    def findUser(self, userID):
        for user in self.users:
            if user.userID == userID:
                return user

    def startPubServer(self):
        def pubSide():
            context = zmq.Context()
            socket = context.socket(zmq.PUB)
            socket.bind("{0}:{1}".format(self.interface, self.subport))

            socket.setsockopt(zmq.HWM, 1) # 除了PUB型会在达到高水位丢弃后续数据外,其他类型的都会以阻塞的形式来应对后续数据
            socket.setsockopt(zmq.SWAP, 25000000)   

            time.sleep(1.0)   
            
            self.pubSocket = socket
                
        pubThread = threading.Thread(target=pubSide, name="pubThread", args=())
        pubThread.start()
        
        time.sleep(0.1)

    def startSubServer(self, servers):
        self.subSockets = {}
        def subSide(serverID, subport):
            context = zmq.Context().instance()
            socket = context.socket(zmq.SUB)
            socket.connect("{0}:{1}".format(self.interface, subport))
            
            socket.setsockopt(zmq.SUBSCRIBE, self.serverID)   
            
            self.subSockets[serverID] = socket
            
            while True:
                topic, msg = socket.recv_multipart()
                self.handleSub(pickle.loads(msg))
        
        for server in servers:
            if server != self:
                subThread = threading.Thread(target=subSide, name="subThread", args=(server.serverID, server.subport))
                subThread.start()
        time.sleep(0.1)
    
    def handleSub(self, pyObj):
        fromWho = pyObj[0]
        toWho = pyObj[1]
        
        time.sleep(0.1)
        print "subscribe save ", toWho 
        
        
    def startBServer(self):
        def serverSide():
            context = zmq.Context()
            socket = context.socket(zmq.REP)
            socket.bind("{0}:{1}".format(self.interface, self.port))
            
            while True :
                req = socket.recv_pyobj()
                rep = self.handle(req)
                socket.send_pyobj(rep)
                
        serverThread = threading.Thread(target=serverSide, name="serverThread", args=())
        serverThread.start()
        
        time.sleep(0.1)
   
    def handle(self, pyObj):
        if isinstance(pyObj, AddUser):
            newUserID = "{0}{1}".format(self.serverID, len(self.users))
            newUser = User(newUserID)
            self.users.append(newUser)
            return newUser
        if isinstance(pyObj, GetAllUsers):  
            return self.users
        
        fromWho = pyObj[0]
        toWho = pyObj[1]
        
        fromUser = self.findUser(fromWho)
        if fromUser:
            print "save ", fromWho
            
            targetUser = self.findUser(toWho)
            if targetUser:
                print "save ", toWho
            else:
                for serverID, _ in self.subSockets.iteritems():
                    if toWho.startswith(serverID):
                        self.pubSocket.send_multipart([serverID, pickle.dumps(pyObj, -1)])
        return "share successed"

class MsgHead(object):
    def __init__(self):
        self.routeSeq = [] # 每经过一个Server,routeSeq增加一条信息,防止出现系统内消息死循环
        self.lastServer = None # 用于权限校验
        self.taskID = None # 用于全流程追踪
        self.debugLevel = 0 # 是否记录日志
        
    def validateHead(self):
        if len(self.routeSeq) > 10:
            print "出现了消息死循环"

class BServer(object):
    def __init__(self, port, servers):
        self.port = port
        self.realSockets = {}
        for server in servers:
            context = zmq.Context()
            socket = context.socket(zmq.REQ)
            socket.connect("{0}:{1}".format(server.interface, server.port))
            
            self.realSockets[server.serverID] = socket
            
    def handle(self, pyObj):
        if isinstance(pyObj, AddUser):
            randInt = random.randint(0, len(self.realSockets) - 1)
            randomRealSocket = self.realSockets.values()[randInt]
            randomRealSocket.send_pyobj(pyObj)
            return randomRealSocket.recv_pyobj()
        
        if isinstance(pyObj, GetAllUsers):        
            for serverID, socket in self.realSockets.iteritems():
                if pyObj.serverID == serverID:
                    socket.send_pyobj(pyObj)
                    return socket.recv_pyobj()
            
        for serverID, socket in self.realSockets.iteritems():
            if pyObj[0].startswith(serverID):
                socket.send_pyobj(pyObj)
                return socket.recv_pyobj()
        
    def startBServer(self, interface='tcp://127.0.0.1', port=1234):
        def serverSide():
            context = zmq.Context()
            socket = context.socket(zmq.REP)
            socket.bind("{0}:{1}".format(interface, port))
            
            while True :
                req = socket.recv_pyobj()
                rep = self.handle(req)
                socket.send_pyobj(rep)
                
        serverThread = threading.Thread(target=serverSide, name="bserverThread", args=())
        serverThread.start()
        
class User(object):
    def __init__(self, userID):
        self.userID = userID

class Share(object):
    def __init__(self, fromWho, toWho, what):
        self.fromWho = fromWho
        self.toWho = toWho
        self.what = what
        
class AddUser(object):
    def __init__(self, userName):
        self.userName = userName

class GetAllUsers(object):
    def __init__(self, serverID):
        self.serverID = serverID

def testClient():
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("{0}:{1}".format(interface, bServer.port))
    
    # client -> bServer -> server 同步
    # server <-> server 异步
    
    for _ in range(20):
        socket.send_pyobj(AddUser("userA"))
        socket.recv_pyobj().userID
    
    socket.send_pyobj(GetAllUsers("a"))
    usersInServerA = socket.recv_pyobj()
    print [pyobj.userID for pyobj in usersInServerA]
    socket.send_pyobj(GetAllUsers("b"))
    usersInServerB = socket.recv_pyobj()
    print [pyobj.userID for pyobj in usersInServerB]
    socket.send_pyobj(GetAllUsers("c"))
    usersInServerC = socket.recv_pyobj()
    print [pyobj.userID for pyobj in usersInServerC]
    
    socket.send_pyobj([usersInServerA[0].userID, usersInServerB[0].userID, "a book"])
    print socket.recv_pyobj()
    
    socket.send_pyobj([usersInServerA[0].userID, usersInServerC[0].userID, "another book"])
    print socket.recv_pyobj()
    
def aaa():
    time.sleep(0.1)
if __name__ == '__main__':
    newThread = globalThreadManager.pullThread(aaa)
    print globalThreadManager.threads
    newThread.start()
    print globalThreadManager.threads
    newThread.join()
    print globalThreadManager.threads
    
    
    interface = 'tcp://127.0.0.1'
    
    servers = []
    
    serverA = Server(servers, "a", 1235, 1236)
    serverB = Server(servers, "b", 1237, 1238)
    serverC = Server(servers, "c", 1239, 1240)
    
    serverA.addUser()
    serverB.addUser()
    serverC.addUser()
    
    serverA.startPubServer()
    serverB.startPubServer()
    serverC.startPubServer()
    
    serverA.startSubServer(servers)
    serverB.startSubServer(servers)
    serverC.startSubServer(servers)
    
    serverA.startBServer()
    serverB.startBServer()
    serverC.startBServer()

    bServer = BServer(1234, servers)
    bServer.startBServer()

    time.sleep(0.1)
        
    testClient()
    
python zmq
import zmq

context = zmq.Context()

receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, "10001")

poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

while True:
    socks = dict(poller.poll())

    if receiver in socks and socks[receiver] == zmq.POLLIN:
        message = receiver.recv()

    if subscriber in socks and socks[subscriber] == zmq.POLLIN:
        message = subscriber.recv()
python isportopen
import socket
from string import ascii_letters, digits


def sanitize_host(host):
    return ''.join([c for c in host[0:255] if c in (ascii_letters + digits + '.-')])

def isportopen(host, port):
    if not (1 <= int(port) <= 65535):
        return False

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    return 0 == sock.connect_ex((sanitize_host(host), int(port)))

if __name__ == "__main__":
    print isportopen("----------", 50170)
python update dic
import collections


def update(d, u):
    for k, v in u.iteritems():
        if isinstance(v, collections.Mapping):
            r = update(d.get(k, {}), v)
            d[k] = r
        else:
            d[k] = u[k]
    return d

if __name__ == "__main__":
    a = {}
    a["a"] = {}
    a["a"]["b"] = "xxxmyu"
    a["b"] = {"b":"qwerty"}
    
    b = {}
    b["a"] = {}
    b["a"]["b"] = "xxxmyu"
    b["a"]["c"] = "asdas"
    b["c"] = {"c":"ccccc"}

    update(a, b)
    print a
需求只来自你对用户的了解。不来自调研、分析、讨论或竞争对手

        
python version
__version_info__ = (0, 11, 1)
__version__ = '.'.join(map(str, __version_info__))
print __version__


def versions_report():
    libs = (
        ("Jinja2", "jinja2", "__version__"),
        ("M2Crypto", "M2Crypto", "version"),
        ("msgpack-python", "msgpack", "version"),
        ("msgpack-pure", "msgpack_pure", "version"),
        ("pycrypto", "Crypto", "__version__"),
        ("PyYAML", "yaml", "__version__"),
        ("PyZMQ", "zmq", "__version__"),
    )

    padding = len(max([lib[0] for lib in libs], key=len)) + 1

    fmt = '{0:>{pad}}: {1}'

    yield fmt.format("Salt", __version__, pad=padding)

    yield fmt.format(
        "Python", sys.version.rsplit('\n')[0].strip(), pad=padding
    )

    for name, imp, attr in libs:
        try:
            imp = __import__(imp)
            version = getattr(imp, attr)
            if not isinstance(version, basestring):
                version = '.'.join(map(str, version))
            yield fmt.format(name, version, pad=padding)
        except ImportError:
            yield fmt.format(name, "not installed", pad=padding)


if __name__ == '__main__':
    for x in  versions_report():
        print x
python 常见函数
1.常用内置函数:(不用import就可以直接使用)  

   help(obj) 在线帮助, obj可是任何类型

   callable(obj) 查看一个obj是不是可以像函数一样调用

   repr(obj) 得到obj的表示字符串,可以利用这个字符串eval重建该对象的一个拷贝

   eval_r(str) 表示合法的python表达式,返回这个表达式

   dir(obj) 查看obj的name space中可见的name

   hasattr(obj,name) 查看一个obj的name space中是否有name

   getattr(obj,name) 得到一个obj的name space中的一个name

   setattr(obj,name,value) 为一个obj的name space中的一个name指向vale这个object

   delattr(obj,name) 从obj的name space中删除一个name

   vars(obj) 返回一个object的name space。用dictionary表示

   locals() 返回一个局部name space,用dictionary表示

   globals() 返回一个全局name space,用dictionary表示

   type(obj) 查看一个obj的类型

   isinstance(obj,cls) 查看obj是不是cls的instance

   issubclass(subcls,supcls) 查看subcls是不是supcls的子类

   

 类型转换函数

   chr(i) 把一个ASCII数值,变成字符

   ord(i) 把一个字符或者unicode字符,变成ASCII数值

   oct(x) 把整数x变成八进制表示的字符串

   hex(x) 把整数x变成十六进制表示的字符串

   str(obj) 得到obj的字符串描述

   list(seq) 把一个sequence转换成一个list

   tuple(seq) 把一个sequence转换成一个tuple

   dict(),dict(list) 转换成一个dictionary

   int(x) 转换成一个integer

   long(x) 转换成一个long interger

   float(x) 转换成一个浮点数

   complex(x) 转换成复数

   max(...) 求最大值

   min(...) 求最小值

 用于执行程序的内置函数

   complie 如果一段代码经常要使用,那么先编译,再运行会更快。
python包
distribute是setuptools的取代
	distribute是对标准库disutils模块的增强
	disutils主要是用来更加容易的打包和分发包,特别是对其他的包有依赖的包

	curl -0 http://python-distribute.org/distribute_setup.py
	python distribute_setup.py

pip是easy_install的取代
	curl -0 https://raw.github.com/pypa/pip/master/contrib/get-pip.py
	python get-pip.py

	wget http://pypi.python.org/packages/source/p/pip/pip-0.7.2.tar.gz
	tar xzf pip-0.7.2.tar.gz
	cd pip-0.7.2
	python setup.py install

Jinja2
	Jinja是基于python的模板引擎
	http://pypi.python.org/pypi/Jinja2

msgpack-python
	MessagePack是一个基于二进制高效的对象序列化Library用于跨语言通信
	
	

M2Crypto
pycrypto
PyYAML


 
pyzmq >= 2.1.9
	消息队列zerozmq的Python 封装
	追求性能为主的消息队列实现,全部数据在内存中保存
	如果担心数据持久化的问题,可以考虑RabbitMQ 等类似方案
	
Kestrel
	

Pyflakes
PyChecker
PyLint
	Python代码语法检查工具

Durus
	Python对象数据库,也可作为一种对象实例持久化的机制
	开源的纯Python实现,并提供一个可选的C语言插件来大幅提高运行效率

Gluttony
	Python模块之间依赖关系图自动生成工具

Heapy
Dowser
	对Python程序进行内存占用剖析的模块

Coverage.py
	Python测试代码覆盖率统计工具,内置于nose 

pyinotify
	利用操作系统自身提供的Notify机制以最高的效率监控文件变化

cairo
	一个支持多种输出的向量图形库
Linux 修改时区tzselect
tzselect

date -s 08/14/12 

date -s 20:59:59 
  
Lua
lua -i -la -lb

dofile("lib1.lua")     

lua -e "print(math.sin(12))"

lua -i -e "_PROMPT=' lua> '"


local path = "/usr/local/lua/lib/libluasocket.so"
local f = assert(loadlib(path, "luaopen_socket"))
f()  -- actually open the library
Global site tag (gtag.js) - Google Analytics