PK |BGk
oops_amqp/publisher.py# Copyright (c) 2011, Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 only.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see .
# GNU Lesser General Public License version 3 (see the file LICENSE).
"""Publish OOPS reports over amqp."""
__metaclass__ = type
from hashlib import md5
from threading import local
from amqplib import client_0_8 as amqp
from anybson import dumps
from utils import (
amqplib_error_types,
is_amqplib_connection_error,
)
__all__ = [
'Publisher',
]
class Publisher:
"""Publish OOPS reports over AMQP.
Messages are published as bson dicts via durable messages sent to a
supplied exchange + routing key.
"""
def __init__(self, connection_factory, exchange_name, routing_key,
inherit_id=False):
"""Create a publisher.
:param connection_factory: A callable which creates an amqplib
Connection when called. This is used to create connections - one
per thread which OOPS publishing happens in. This is because
amqplib is not threadsafe and recommends not sharing connections
across threads.
:param exchange_name: The name of the exchange to publish to.
:param routing_key: The routing key for messages.
:param inherit_id: If True any 'True' 'id' in an OOPS report is
preserved. Handy if an id that has already been shown to a user is
being published (but uniqueness cannot be guaranteed).
"""
self.connection_factory = connection_factory
self.exchange_name = exchange_name
self.routing_key = routing_key
self.channels = local()
self.inherit_id = inherit_id
def get_channel(self):
if getattr(self.channels, 'channel', None) is None:
try:
self.channels.channel = self.connection_factory().channel()
except amqplib_error_types, e:
if is_amqplib_connection_error(e):
# Could not connect
return None
# Unknown error mode : don't hide it.
raise
return self.channels.channel
def __call__(self, report):
# Don't mess with the passed in report.
report = dict(report)
if not self.inherit_id or not report.get('id'):
# Discard any existing id.
original_id = report.pop('id', None)
# Hash it, to make an ID
oops_id = "OOPS-%s" % md5(dumps(report)).hexdigest()
# Store the id in what we send on the wire, so that the recipient
# has it.
report['id'] = oops_id
message = amqp.Message(dumps(report))
# We don't want to drop OOPS on the floor if rabbit is restarted.
message.properties["delivery_mode"] = 2
channel = self.get_channel()
if channel is None:
return []
try:
channel.basic_publish(
message, self.exchange_name, routing_key=self.routing_key)
except amqplib_error_types, e:
self.channels.channel = None
if is_amqplib_connection_error(e):
# Could not connect / interrupted connection
return []
# Unknown error mode : don't hide it.
raise
return [report['id']]
PK |BG
oops_amqp/trace.py# Copyright (c) 2012, Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 only.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see .
# GNU Lesser General Public License version 3 (see the file LICENSE).
"""Trace OOPS reports coming from an AMQP queue."""
from functools import partial
import sys
import optparse
from textwrap import dedent
import amqplib.client_0_8 as amqp
import oops
import oops_amqp
import anybson as bson
def main(argv=None):
if argv is None:
argv=sys.argv
usage = dedent("""\
%prog [options]
The following options must be supplied:
--host
e.g.
oops-amqp-trace --host "localhost:3472"
If you do not have a persistent queue, you should run this script
before generating oopses, as AMQP will discard messages with no
consumers.
""")
description = "Trace OOPS reports coming from an AMQP queue."
parser = optparse.OptionParser(
description=description, usage=usage)
parser.add_option('--host', help="AQMP host / host:port.")
parser.add_option('--username', help="AQMP username.", default="guest")
parser.add_option('--password', help="AQMP password.", default="guest")
parser.add_option('--vhost', help="AMQP vhost.", default="/")
parser.add_option('--exchange', help="AMQP exchange name.", default="oopses")
options, args = parser.parse_args(argv[1:])
def needed(optname):
if getattr(options, optname, None) is None:
raise ValueError('option "%s" must be supplied' % optname)
needed('host')
factory = partial(
amqp.Connection, host=options.host, userid=options.username,
password=options.password, virtual_host=options.vhost)
connection = factory()
channel = connection.channel()
channel.exchange_declare(options.exchange, type="fanout", durable=False,
auto_delete=True)
queue = channel.queue_declare(durable=False, auto_delete=True)[0]
channel.queue_bind(queue, options.exchange)
config = oops.Config()
config.publisher = oops.pprint_to_stream(sys.stdout)
receiver = oops_amqp.Receiver(config, factory, queue)
try:
receiver.run_forever()
except KeyboardInterrupt:
pass
PK |BG3@0x x oops_amqp/receiver.py# Copyright (c) 2011, Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 only.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see .
# GNU Lesser General Public License version 3 (see the file LICENSE).
"""Receive OOPS reports over amqp and republish locally."""
__metaclass__ = type
import time
import anybson as bson
from utils import (
amqplib_error_types,
close_ignoring_connection_errors,
is_amqplib_connection_error,
)
__all__ = [
'Receiver',
]
class Receiver:
"""Republish OOPS reports from AMQP to a local oops.Config.
:ivar stopping: When True will cause Receiver to break out of run_forever.
Calls to run_forever reset this to False.
:ivar sentinel: If a message identical to the sentinel is received,
handle_report will set stopping to True.
"""
def __init__(self, config, connection_factory, queue_name):
"""Create a Receiver.
:param config: An oops.Config to republish the OOPS reports.
:param connection_factory: An amqplib connection factory, used to make
the initial connection and to reconnect if that connection is
interrupted.
:param queue_name: The queue to listen for reports on.
"""
self.config = config
self.connection = None
self.channel = None
self.connection_factory = connection_factory
self.queue_name = queue_name
self.sentinel = None
def handle_report(self, message):
if message.body == self.sentinel:
self.stopping = True
self.channel.basic_ack(message.delivery_tag)
return
try:
report = bson.loads(message.body)
except KeyError:
# Garbage in the queue. Possibly this should raise an OOPS itself
# (through a different config) or log an info level message.
pass
self.config.publish(report)
# ACK last so errors here don't eat the message.
self.channel.basic_ack(message.delivery_tag)
def run_forever(self):
"""Run in a loop handling messages.
If the amqp server is down or uncontactable for > 120 seconds, error
out.
"""
self.stopping = False
self.went_bad = None
while (not self.stopping and
(not self.went_bad or time.time() < self.went_bad + 120)):
try:
self._run_forever()
except amqplib_error_types, e:
if not is_amqplib_connection_error(e):
# Something unknown went wrong.
raise
if not self.went_bad:
self.went_bad = time.time()
# Don't probe immediately, give the network/process time to
# come back.
time.sleep(0.1)
def _run_forever(self):
self.connection = self.connection_factory()
# A successful connection: record this so run_forever won't bail early.
self.went_bad = None
try:
self.channel = self.connection.channel()
try:
self.consume_tag = self.channel.basic_consume(
self.queue_name, callback=self.handle_report)
try:
while True:
self.channel.wait()
if self.stopping:
break
finally:
if self.channel.is_open:
self.channel.basic_cancel(self.consume_tag)
finally:
close_ignoring_connection_errors(self.channel)
finally:
close_ignoring_connection_errors(self.connection)
PK |BG! oops_amqp/utils.py# Copyright (c) 2011, Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 only.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see .
# GNU Lesser General Public License version 3 (see the file LICENSE).
"""Utility functions for oops_amqp."""
import errno
import socket
from amqplib.client_0_8.exceptions import AMQPConnectionException
__all__ = [
'amqplib_error_types',
'close_ignoring_connection_errors',
'is_amqplib_connection_error',
'is_amqplib_ioerror',
]
# These exception types always indicate an AMQP connection error/closure.
# However you should catch amqplib_error_types and post-filter with
# is_amqplib_connection_error.
amqplib_connection_errors = (socket.error, AMQPConnectionException)
# A tuple to reduce duplication in different code paths. Lists the types of
# exceptions legitimately raised by amqplib when the AMQP server goes down.
# Not all exceptions *will* be such errors - use is_amqplib_connection_error to
# do a second-stage filter after catching the exception.
amqplib_error_types = amqplib_connection_errors + (IOError,)
def close_ignoring_connection_errors(closable):
try:
return closable.close()
except amqplib_error_types, e:
if is_amqplib_connection_error(e):
return
raise
def is_amqplib_ioerror(e):
"""Returns True if e is an amqplib internal exception."""
# Raised by amqplib rather than socket.error on ssl issues and short reads.
return type(e) is IOError and e.args == ('Socket error',)
def is_amqplib_connection_error(e):
"""Return True if e was (probably) raised due to a connection issue."""
return isinstance(e, amqplib_connection_errors) or is_amqplib_ioerror(e)
PK jEG+uq oops_amqp/__init__.py#
# Copyright (c) 2011, Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 only.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see .
# GNU Lesser General Public License version 3 (see the file LICENSE).
"""Publish OOPS reports over AMQP.
The oops_amqp package provides an AMQP OOPS http://pypi.python.org/pypi/oops)
publisher, and a small daemon that listens on amqp for OOPS reports and
republishes them (into a supplied publisher). The OOPS framework permits
falling back to additional publishers if AMQP is down.
Usage
=====
Publishing to AMQP
++++++++++++++++++
Where you are creating OOPS reports, configure oops_amqp.Publisher. This takes
a connection factory - a simple callable that creates an amqp
connection - and the exchange name and routing key to submit to.
>>> factory = partial(amqp.Connection, host="localhost:5672",
... userid="guest", password="guest", virtual_host="/", insist=False)
>>> publisher = oops_amqp.Publisher(factory, "oopses", "")
Provide the publisher to your OOPS config::
>>> config = oops.Config()
>>> config.publisher = publisher
Any oops published via that config will now be sent via amqp.
OOPS ids are generating by hashing the oops message (without the id field) -
this ensures unique ids.
The reason a factory is used is because amqplib is not threadsafe - the
publisher maintains a thread locals object to hold the factories and creates
connections when new threads are created(when they first generate an OOPS).
Dealing with downtime
---------------------
From time to time your AMQP server may be unavailable. If that happens then
the Publisher will not assign an oops id - it will return None to signal that
the publication failed. To prevent losing the OOPS its a good idea to have a
fallback publisher - either another AMQP publisher (to a different server) or
one that spools locally (where you can pick up the OOPSes via rsync or some
other mechanism. Using the oops standard helper publish_with_fallback will let
you wrap the fallback publisher so that it only gets invoked if the primary
method failed::
>>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",
... userid="guest", password="guest", virtual_host="/", insist=False)
>>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")
>>> config.publisher = publish_with_fallback(publisher, fallback_publisher)
Receiving from AMQP
+++++++++++++++++++
There is a simple method that will run an infinite loop processing reports from
AMQP. To use it you need to configure a local config to publish the received
reports. A full config is used because that includes support for filtering
(which can be useful if you need to throttle volume, for instance).
Additionally you need an amqp connection factory (to handle the amqp server
being restarted) and a queue name to receive from.
This example uses the DateDirRepo publisher, telling it to accept whatever
id was assigned by the process publishing to AMQP::
>>> publisher = oops_datedir_repo.DateDirRepo('.', inherit_id=True)
>>> config = oops.Config()
>>> config.publisher = publisher.publish
>>> receiver = oops_amqp.Receiver(config, factory, "my queue")
>>> receiver.run_forever()
"""
# same format as sys.version_info: "A tuple containing the five components of
# the version number: major, minor, micro, releaselevel, and serial. All
# values except releaselevel are integers; the release level is 'alpha',
# 'beta', 'candidate', or 'final'. The version_info value corresponding to the
# Python version 2.0 is (2, 0, 0, 'final', 0)." Additionally we use a
# releaselevel of 'dev' for unreleased under-development code.
#
# If the releaselevel is 'alpha' then the major/minor/micro components are not
# established at this point, and setup.py will use a version of next-$(revno).
# If the releaselevel is 'final', then the tarball will be major.minor.micro.
# Otherwise it is major.minor.micro~$(revno).
__version__ = (0, 0, 8, 'beta', 1)
__all__ = [
'Publisher',
'Receiver',
]
from oops_amqp.publisher import Publisher
from oops_amqp.receiver import Receiver
PK |BGjGU oops_amqp/anybson.py# Copyright (c) 2012, Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 only.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see .
# GNU Lesser General Public License version 3 (see the file LICENSE).
__all__ = [
'dumps',
'loads',
]
try:
from bson import dumps, loads
except ImportError:
from bson import BSON
def dumps(obj):
return BSON.encode(obj)
def loads(data):
return BSON(data).decode(tz_aware=True)
PK M2H*f + oops_amqp-0.0.8b1.dist-info/DESCRIPTION.rst**************************************************
python-oops-amqp: Transmit error reports over amqp
**************************************************
Copyright (c) 2011, Canonical Ltd
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, version 3 only.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see .
GNU Lesser General Public License version 3 (see the file LICENSE).
The oops_amqp package provides an AMQP OOPS http://pypi.python.org/pypi/oops)
publisher, and a small daemon that listens on amqp for OOPS reports and
republishes them (into a supplied publisher). The OOPS framework permits
falling back to additional publishers if AMQP is down.
Dependencies
============
* Python 2.6+
* bson
* oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer.
* amqplib
Testing Dependencies
====================
* oops-datedir-repo (http://pypi.python.org/pypi/oops_datedir_repo)
* rabbitfixture (http://pypi.python.org/pypi/rabbitfixture)
* subunit (http://pypi.python.org/pypi/python-subunit) (optional)
* testresources (http://pypi.python.org/pypi/testresources)
* testtools (http://pypi.python.org/pypi/testtools)
Usage
=====
Publishing to AMQP
++++++++++++++++++
Where you are creating OOPS reports, configure oops_amqp.Publisher. This takes
a connection factory - a simple callable that creates an amqp
connection - and the exchange name and routing key to submit to.
>>> factory = partial(amqp.Connection, host="localhost:5672",
... userid="guest", password="guest", virtual_host="/", insist=False)
>>> publisher = oops_amqp.Publisher(factory, "oopses", "")
Provide the publisher to your OOPS config::
>>> config = oops.Config()
>>> config.publisher = publisher
Any oops published via that config will now be sent via amqp.
OOPS ids are generating by hashing the oops message (without the id field) -
this ensures unique ids.
The reason a factory is used is because amqplib is not threadsafe - the
publisher maintains a thread locals object to hold the factories and creates
connections when new threads are created(when they first generate an OOPS).
Dealing with downtime
---------------------
>From time to time your AMQP server may be unavailable. If that happens then
the Publisher will not assign an oops id - it will return None to signal that
the publication failed. To prevent losing the OOPS its a good idea to have a
fallback publisher - either another AMQP publisher (to a different server) or
one that spools locally (where you can pick up the OOPSes via rsync or some
other mechanism. Using the oops standard helper publish_with_fallback will let
you wrap the fallback publisher so that it only gets invoked if the primary
method failed::
>>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",
... userid="guest", password="guest", virtual_host="/", insist=False)
>>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")
>>> config.publisher = publish_with_fallback(publisher, fallback_publisher)
Receiving from AMQP
+++++++++++++++++++
There is a simple method that will run an infinite loop processing reports from
AMQP. To use it you need to configure a local config to publish the received
reports. A full config is used because that includes support for filtering
(which can be useful if you need to throttle volume, for instance).
Additionally you need an amqp connection factory (to handle the amqp server
being restarted) and a queue name to receive from.
This example uses the DateDirRepo publisher, telling it to accept whatever
id was assigned by the process publishing to AMQP::
>>> publisher = oops_datedir_repo.DateDirRepo('.', inherit_id=True)
>>> config = oops.Config()
>>> config.publisher = publisher.publish
>>> receiver = oops_amqp.Receiver(config, factory, "my queue")
>>> receiver.run_forever()
For more information see pydoc oops_amqp.
Installation
============
Either run setup.py in an environment with all the dependencies available, or
add the working directory to your PYTHONPATH.
Development
===========
Upstream development takes place at https://launchpad.net/python-oops-amqp.
To setup a working area for development, if the dependencies are not
immediately available, you can use ./bootstrap.py to create bin/buildout, then
bin/py to get a python interpreter with the dependencies available.
To run the tests use the runner of your choice, the test suite is
oops_amqp.tests.test_suite.
For instance::
$ bin/py -m testtools.run oops_amqp.tests.test_suite
If you have testrepository you can run the tests with that::
$ testr run
PK M2H-!: : , oops_amqp-0.0.8b1.dist-info/entry_points.txt[console_scripts]
oops-amqp-trace = oops_amqp.trace:main
PK M2H;)S ) oops_amqp-0.0.8b1.dist-info/metadata.json{"classifiers": ["Development Status :: 2 - Pre-Alpha", "Intended Audience :: Developers", "License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)", "Operating System :: OS Independent", "Programming Language :: Python"], "extensions": {"python.commands": {"wrap_console": {"oops-amqp-trace": "oops_amqp.trace:main"}}, "python.details": {"contacts": [{"email": "launchpad-dev@lists.launchpad.net", "name": "Launchpad Developers", "role": "author"}], "document_names": {"description": "DESCRIPTION.rst"}, "project_urls": {"Home": "https://launchpad.net/python-oops-amqp"}}, "python.exports": {"console_scripts": {"oops-amqp-trace": "oops_amqp.trace:main"}}}, "extras": ["test"], "generator": "bdist_wheel (0.26.0)", "metadata_version": "2.0", "name": "oops-amqp", "run_requires": [{"requires": ["amqplib", "bson", "oops (>=0.0.11)"]}, {"extra": "test", "requires": ["rabbitfixture", "testresources", "testtools"]}], "summary": "OOPS AMQP transport.", "version": "0.0.8b1"}PK M2HV{s;
) oops_amqp-0.0.8b1.dist-info/top_level.txtoops_amqp
PK M2H''\ \ ! oops_amqp-0.0.8b1.dist-info/WHEELWheel-Version: 1.0
Generator: bdist_wheel (0.26.0)
Root-Is-Purelib: true
Tag: py2-none-any
PK M2Hz#Y[ $ oops_amqp-0.0.8b1.dist-info/METADATAMetadata-Version: 2.0
Name: oops-amqp
Version: 0.0.8b1
Summary: OOPS AMQP transport.
Home-page: https://launchpad.net/python-oops-amqp
Author: Launchpad Developers
Author-email: launchpad-dev@lists.launchpad.net
License: UNKNOWN
Platform: UNKNOWN
Classifier: Development Status :: 2 - Pre-Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Requires-Dist: amqplib
Requires-Dist: bson
Requires-Dist: oops (>=0.0.11)
Provides-Extra: test
Requires-Dist: rabbitfixture; extra == 'test'
Requires-Dist: testresources; extra == 'test'
Requires-Dist: testtools; extra == 'test'
**************************************************
python-oops-amqp: Transmit error reports over amqp
**************************************************
Copyright (c) 2011, Canonical Ltd
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, version 3 only.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see .
GNU Lesser General Public License version 3 (see the file LICENSE).
The oops_amqp package provides an AMQP OOPS http://pypi.python.org/pypi/oops)
publisher, and a small daemon that listens on amqp for OOPS reports and
republishes them (into a supplied publisher). The OOPS framework permits
falling back to additional publishers if AMQP is down.
Dependencies
============
* Python 2.6+
* bson
* oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer.
* amqplib
Testing Dependencies
====================
* oops-datedir-repo (http://pypi.python.org/pypi/oops_datedir_repo)
* rabbitfixture (http://pypi.python.org/pypi/rabbitfixture)
* subunit (http://pypi.python.org/pypi/python-subunit) (optional)
* testresources (http://pypi.python.org/pypi/testresources)
* testtools (http://pypi.python.org/pypi/testtools)
Usage
=====
Publishing to AMQP
++++++++++++++++++
Where you are creating OOPS reports, configure oops_amqp.Publisher. This takes
a connection factory - a simple callable that creates an amqp
connection - and the exchange name and routing key to submit to.
>>> factory = partial(amqp.Connection, host="localhost:5672",
... userid="guest", password="guest", virtual_host="/", insist=False)
>>> publisher = oops_amqp.Publisher(factory, "oopses", "")
Provide the publisher to your OOPS config::
>>> config = oops.Config()
>>> config.publisher = publisher
Any oops published via that config will now be sent via amqp.
OOPS ids are generating by hashing the oops message (without the id field) -
this ensures unique ids.
The reason a factory is used is because amqplib is not threadsafe - the
publisher maintains a thread locals object to hold the factories and creates
connections when new threads are created(when they first generate an OOPS).
Dealing with downtime
---------------------
>From time to time your AMQP server may be unavailable. If that happens then
the Publisher will not assign an oops id - it will return None to signal that
the publication failed. To prevent losing the OOPS its a good idea to have a
fallback publisher - either another AMQP publisher (to a different server) or
one that spools locally (where you can pick up the OOPSes via rsync or some
other mechanism. Using the oops standard helper publish_with_fallback will let
you wrap the fallback publisher so that it only gets invoked if the primary
method failed::
>>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",
... userid="guest", password="guest", virtual_host="/", insist=False)
>>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")
>>> config.publisher = publish_with_fallback(publisher, fallback_publisher)
Receiving from AMQP
+++++++++++++++++++
There is a simple method that will run an infinite loop processing reports from
AMQP. To use it you need to configure a local config to publish the received
reports. A full config is used because that includes support for filtering
(which can be useful if you need to throttle volume, for instance).
Additionally you need an amqp connection factory (to handle the amqp server
being restarted) and a queue name to receive from.
This example uses the DateDirRepo publisher, telling it to accept whatever
id was assigned by the process publishing to AMQP::
>>> publisher = oops_datedir_repo.DateDirRepo('.', inherit_id=True)
>>> config = oops.Config()
>>> config.publisher = publisher.publish
>>> receiver = oops_amqp.Receiver(config, factory, "my queue")
>>> receiver.run_forever()
For more information see pydoc oops_amqp.
Installation
============
Either run setup.py in an environment with all the dependencies available, or
add the working directory to your PYTHONPATH.
Development
===========
Upstream development takes place at https://launchpad.net/python-oops-amqp.
To setup a working area for development, if the dependencies are not
immediately available, you can use ./bootstrap.py to create bin/buildout, then
bin/py to get a python interpreter with the dependencies available.
To run the tests use the runner of your choice, the test suite is
oops_amqp.tests.test_suite.
For instance::
$ bin/py -m testtools.run oops_amqp.tests.test_suite
If you have testrepository you can run the tests with that::
$ testr run
PK M2Hpr< < " oops_amqp-0.0.8b1.dist-info/RECORDoops_amqp/__init__.py,sha256=HE3E1FtHWogQh96Gz-S9FHVvBQGMKVBTJizVRSbDSfI,4626
oops_amqp/anybson.py,sha256=4KKHoFHfvDZKcwKaEYEHJvRFK8vnTltZwYhnHnh6FIk,960
oops_amqp/publisher.py,sha256=ryDy1761fQSiQGh_iNgxFwHLYtpJtOdsJH3yB58KbK0,3850
oops_amqp/receiver.py,sha256=hZVm6K8bU4nSkKOQkfh3TIm7sbidzlCIv3GlqI3kXEk,4216
oops_amqp/trace.py,sha256=hYtS3qD5N2sxByn0CXKhCyWyTRSsVhb42BrCbm7OuRw,2741
oops_amqp/utils.py,sha256=L25QNn6fEtlbqcF9vI1GSHxG8abclKmrDn-bfKECF34,2205
oops_amqp-0.0.8b1.dist-info/DESCRIPTION.rst,sha256=bKlbFBIJ2GgeVni82hnLfkPEIZkTvEvwweVIH2WAYE0,5101
oops_amqp-0.0.8b1.dist-info/METADATA,sha256=wfPzuv0LtbzGlpVhFGGL8zcjP9FOC39ge-clZCg_4V0,5851
oops_amqp-0.0.8b1.dist-info/RECORD,,
oops_amqp-0.0.8b1.dist-info/WHEEL,sha256=JTb7YztR8fkPg6aSjc571Q4eiVHCwmUDlX8PhuuqIIE,92
oops_amqp-0.0.8b1.dist-info/entry_points.txt,sha256=o6NY6NlUcf-g4rea96syhuDJb_RNxF_PfYls6wFFIU4,58
oops_amqp-0.0.8b1.dist-info/metadata.json,sha256=qnHxW304zdZ2iSrJbBDH-VDkLR6yVpzoUpILlHhMNSY,995
oops_amqp-0.0.8b1.dist-info/top_level.txt,sha256=vQRHv43fSQ1GLGbpwzMjcDe_fU6qyptgeMJADDeB8q8,10
PK |BGk
oops_amqp/publisher.pyPK |BG
> oops_amqp/trace.pyPK |BG3@0x x # oops_amqp/receiver.pyPK |BG! * oops_amqp/utils.pyPK jEG+uq 3 oops_amqp/__init__.pyPK |BGjGU E oops_amqp/anybson.pyPK M2H*f + I oops_amqp-0.0.8b1.dist-info/DESCRIPTION.rstPK M2H-!: : , ^ oops_amqp-0.0.8b1.dist-info/entry_points.txtPK M2H;)S ) ^ oops_amqp-0.0.8b1.dist-info/metadata.jsonPK M2HV{s;
) b oops_amqp-0.0.8b1.dist-info/top_level.txtPK M2H''\ \ ! c oops_amqp-0.0.8b1.dist-info/WHEELPK M2Hz#Y[ $ c oops_amqp-0.0.8b1.dist-info/METADATAPK M2Hpr< < " z oops_amqp-0.0.8b1.dist-info/RECORDPK
;