diff options
author | Micah Anderson <micah@riseup.net> | 2014-11-11 11:53:55 -0500 |
---|---|---|
committer | Micah Anderson <micah@riseup.net> | 2014-11-11 11:53:55 -0500 |
commit | 7d5c3dcd969161322deed6c43f8a6a3cb92c3369 (patch) | |
tree | 109b05c88c7252d7609ef324d62ef9dd7f06123f /examples/mongodb | |
parent | 44be832c5708baadd146cb954befbc3dcad8d463 (diff) |
upgrade to 14.4.1upstream/14.4.1
Diffstat (limited to 'examples/mongodb')
-rw-r--r-- | examples/mongodb/client.py | 46 | ||||
-rw-r--r-- | examples/mongodb/controller.py | 91 |
2 files changed, 137 insertions, 0 deletions
diff --git a/examples/mongodb/client.py b/examples/mongodb/client.py new file mode 100644 index 0000000..839dce7 --- /dev/null +++ b/examples/mongodb/client.py @@ -0,0 +1,46 @@ +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Justin Riley +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import json +import zmq + +class MongoZMQClient(object): + """ + Client that connects with MongoZMQ server to add/fetch docs + """ + + def __init__(self, connect_addr='tcp://127.0.0.1:5000'): + self._context = zmq.Context() + self._socket = self._context.socket(zmq.DEALER) + self._socket.connect(connect_addr) + + def _send_recv_msg(self, msg): + self._socket.send_multipart(msg) + return self._socket.recv_multipart()[0] + + def get_doc(self, keys): + msg = ['get', json.dumps(keys)] + json_str = self._send_recv_msg(msg) + return json.loads(json_str) + + def add_doc(self, doc): + msg = ['add', json.dumps(doc)] + return self._send_recv_msg(msg) + +def main(): + client = MongoZMQClient() + for i in range(10): + doc = {'job': str(i)} + print "Adding doc", doc + print client.add_doc(doc) + for i in range(10): + query = {'job': str(i)} + print "Getting doc matching query:", query + print client.get_doc(query) + +if __name__ == "__main__": + main() diff --git a/examples/mongodb/controller.py b/examples/mongodb/controller.py new file mode 100644 index 0000000..e154f1c --- /dev/null +++ b/examples/mongodb/controller.py @@ -0,0 +1,91 @@ +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Justin Riley +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import sys +import zmq +import pymongo +import pymongo.json_util +import json + +class MongoZMQ(object): + """ + ZMQ server that adds/fetches documents (ie dictionaries) to a MongoDB. + + NOTE: mongod must be started before using this class + """ + + def __init__(self, db_name, table_name, bind_addr="tcp://127.0.0.1:5000"): + """ + bind_addr: address to bind zmq socket on + db_name: name of database to write to (created if doesnt exist) + table_name: name of mongodb 'table' in the db to write to (created if doesnt exist) + """ + self._bind_addr = bind_addr + self._db_name = db_name + self._table_name = table_name + self._conn = pymongo.Connection() + self._db = self._conn[self._db_name] + self._table = self._db[self._table_name] + + def _doc_to_json(self, doc): + return json.dumps(doc,default=pymongo.json_util.default) + + def add_document(self, doc): + """ + Inserts a document (dictionary) into mongo database table + """ + print 'adding docment %s' % (doc) + try: + self._table.insert(doc) + except Exception,e: + return 'Error: %s' % e + + def get_document_by_keys(self, keys): + """ + Attempts to return a single document from database table that matches + each key/value in keys dictionary. + """ + print 'attempting to retrieve document using keys: %s' % keys + try: + return self._table.find_one(keys) + except Exception,e: + return 'Error: %s' % e + + def start(self): + context = zmq.Context() + socket = context.socket(zmq.ROUTER) + socket.bind(self._bind_addr) + while True: + msg = socket.recv_multipart() + print "Received msg: ", msg + if len(msg) != 3: + error_msg = 'invalid message received: %s' % msg + print error_msg + reply = [msg[0], error_msg] + socket.send_multipart(reply) + continue + id = msg[0] + operation = msg[1] + contents = json.loads(msg[2]) + # always send back the id with ROUTER + reply = [id] + if operation == 'add': + self.add_document(contents) + reply.append("success") + elif operation == 'get': + doc = self.get_document_by_keys(contents) + json_doc = self._doc_to_json(doc) + reply.append(json_doc) + else: + print 'unknown request' + socket.send_multipart(reply) + +def main(): + MongoZMQ('ipcontroller','jobs').start() + +if __name__ == "__main__": + main() |