python,multiprocessing,python-multithreading , Why aren't my consumers consuming?

Why aren't my consumers consuming?


Tag: python,multiprocessing,python-multithreading

EDIT: I have tracked down the problem to be the part of my program where I download the Zip file and parse it. If I comment that out and replace it with a default line, it parses 10,000 times with no problem.

Not sure how much of this question should be edited to reflect that finding.

I've written a python program which downloads a zip file which contains a single log file of about 10,000 lines. It then parses this file, line by line, and puts the data into a database.

Eventually my script will run through 200 servers/zip files and process about 100,000 lines. (Not all servers have the needed file)

However, currently when I run the script with 1 consumer, I only get about 13 lines processed into the database. If I run 2 consumers, I get 24. If I run 10 consumers I get 100, and if I run 20 consumers I get 240.

Sometimes, the result of running the script is "Consumer Finished" with said number of entries in the database (far short of the 10K-30K I'm expecting) But other times, I get an error message:

> Traceback (most recent call last):   File
> "C:\Python27\lib\multiprocessing\", line 262, in _feed
>     send(obj) IOError: [Errno 232] The pipe is being closed

What can be causing this problem? Attached is a modified version of my code to remove sensitive data:

import urllib, urlparse
import sys
import os
import datetime
from calculon import Calculon
import random
import pprint
import time
import random
import urllib, urlparse
import traceback
import psycopg2
import psycopg2.extras
from datetime import date, datetime, time, timedelta
import os.path
import requests
import io
import urllib2, cStringIO, zipfile
import re
import httplib
import urlparse

def daterange(start_date, end_date):
  for n in range(int((end_date - start_date).days)):
    yield start_date + timedelta(n)

def producer(args):
  print "Producing!"
  logdt_start = args["logdt_start"]
  logdt_end = args["logdt_end"]
  for single_date in daterange(logdt_start, logdt_end):
    logdt = single_date + timedelta(days=1)
    print "Reading log file..."
    for x in range(1,2):
      servername = "server-{0}".format("%02d" % (x,))
      filename = "zipped_log.log{0}".format(
      url = "{0}/{1}".format(servername, filename)
      zip_path = 'path/to/file/within/zip/{0}/{1}'.format(servername, filename)
      if httpExists(url):
           request = urllib2.urlopen(url)
           zipinmemory =  cStringIO.StringIO(
             with zipfile.ZipFile(zipinmemory) as archive:
               with as log:
                 print "File Found! Reading %s..." % filename
                 for line in log:

          print "Queue has approximatly {0} items".format(args["_queue"].qsize())

          print "exception could not load %s" % url
    return True

def httpExists(url):
    host, path = urlparse.urlsplit(url)[1:3]
    found = 0
        connection = httplib.HTTPConnection(host)  ## Make HTTPConnection Object
        connection.request("HEAD", path)
        responseOb = connection.getresponse()      ## Grab HTTPResponse Object

        if responseOb.status == 200:
            found = 1
            #print "Status %d %s : %s" % (responseOb.status, responseOb.reason, url)
    except Exception, e:
        print e.__class__,  e, url
    return found

def parse_log(line):
    if len(line) < 10 or line[0] != '[':
        return {}
    mod_line = line
    mod_line = mod_line.replace('  ', ' ') #whats this for?
    query_dict = {}
    match ='([\d:\/\s]+)\sUTC', mod_line)
    s = match.start()
    e = match.end() - 5
    query_dict['date_ts'] = datetime.strptime(mod_line[s:e], '%d/%m/%Y %H:%M:%S:%f')
    e = e+2
    mod_line = mod_line[e:]
    match ='(\w+)\sLogger:\s', mod_line)
    e = match.end()
    query_dict['status'] =
    mod_line = mod_line[e:]
    for key_value in re.split(',', mod_line):
        keypair ='(\w+)=(\w+)', key_value)
        key =
        value =
        query_dict[key] = value
    return query_dict

def consumer(args):
  global consumed
  consumed += 1
  print "Consumed : {0}".format(consumed)
    db = args["db"]
    cname = args["cname"]
    arg_value = args["_value"]

    cur = db.cursor()
    error_count = 0

    if arg_value is None:
        print "Consumer Finished!"
        return False
    line = arg_value
    qdict = parse_log(line)

    if len(qdict) == 0:
        print "No data to consumer %s" % cname
        return None

    query = """
    INSERT INTO my_db(date_ts,
        status, cmd, creativeString, environment_id, client_type_id, platform_id, sn_type_id, user_id,
        device_id, invoker_sn_id, invoker_type, poster_sn_id, origin, event_type, creative_id, ch,
         src, afp, cmp, p1, p2,p3)
    VALUES (%(date_ts)s,%(status)s,%(cmd)s,%(creativeString)s,%(environment_id)s,%(client_type_id)s,%(platform_id)s,
    %(event_type)s,%(creative_id)s,%(ch)s, %(src)s, %(afp)s, %(cmp)s,
    %(p1)s, %(p2)s, %(p3)s);

      cur.execute(cur.mogrify(query, qdict))
      global processed
      processed += 1
      print "processed : {0}".format(processed)
      error_count = error_count + 1
      print "ERROR in insert {0}".format(error_count)
      print qdict
      print "Error in parsing:  "  + val

def main():
  log_start = datetime(2015,1,19);
  log_end = datetime(2015,1,20);
  consumer_args_list = []

  noOfConsumers = 1;
  for x in range(0, noOfConsumers):
    print "Creating Consumer {0}".format(x)
    print "Connecting to logs db..."
    db_name = 'mydb'
    connString = "dbname={0} host={1} port={2} user={3} password={4}".format(db_name, 'localhost', 5433, 'postgres',                                                                       'pword')
    db = psycopg2.connect(connString)
    consumer_args = {"cname": "CONSUMER_{0}".format(x), "db":db}

  calculon = Calculon( producer, 
        [{"logdt_start": log_start,
          "logdt_end": log_end}],
  result = calculon.start()

consumed = 0
processed = 0
if __name__ == "__main__":

The output looks like this:

> Creating Consumer 0
Connecting to logs db...
Reading log file...
File Found! Reading log2015-01-20...
Queue has approximatly 9549 items
Consumed : 1
processed : 1
Consumed : 2
processed : 2
Consumed : 3
processed : 3
Consumed : 4
processed : 4
Consumed : 5
processed : 5
Consumed : 6
processed : 6
Consumed : 7
processed : 7
Consumed : 8
processed : 8
Consumed : 9
processed : 9
Consumed : 10
processed : 10
Consumed : 11
processed : 11
Consumed : 12
processed : 12
Consumed : 13
Traceback (most recent call last):
  File "C:\Python27\lib\multiprocessing\", line 262, in _feed
IOError: [Errno 232] The pipe is being closed


The error turned out to be a bad line in the input file, which broke the regular expression.

For example: One of the values of the comma seperated list was: foobar=2, foo=Something here, is ,a really, poor value, bar=2

I was able to fix the problem by adding the following code within the consumer method:

      qdict = parse_adx_client_log(line)
      qdict = {}
      print "BAD LINE {0}".format(line)

    if len(qdict) == 0:
       print "No data to consumer %s" % cname
       return None


Strange Behavior: Floating Point Error after Appending to List

I am writing a simple function to step through a range with floating step size. To keep the output neat, I wrote a function, correct, that corrects the floating point error that is common after an arithmetic operation. That is to say: correct(0.3999999999) outputs 0.4, correct(0.1000000001) outputs 0.1, etc. Here's...

How to change the IP address of Amazon EC2 instance using boto library

How can I assign a new IP address (or Elastic IP) to an already existing AWS EC2 instance using boto library.

SyntaxError: invalid syntax?

Good afternoon, I am developing a script in python and while I am trying to compile it from the terminator/terminal i always get this error, but I cannot understand where is the syntax error? File "", line 128 print ('########################') ^ SyntaxError: invalid syntax Then I just change the position...

How to remove structure with python from this case?
How to remove "table" from HTML using python? I had case like this: paragraph = ''' <p>Lorem ipsum dolor sit amet, consectetur adipisicing elit. Quidem molestiae consequuntur officiis corporis sint.<br /><br /> <table> <tr> <td> text title </td> <td> text title 2 </td> </tr> </table> <p> lorem ipsum</p> ''' how...

Inserting a variable in MongoDB specifying _id field

I want to insert a variable, say, a = {1:2,3:4} into my database with a particular id "56". It is very clear from the docs that I can do the following: db.testcol.insert({"_id": "56", 1:2, 3:4}) However, I cannot figure out any way to insert "a" itself, specifying an id. In...

odoo v8 - Field(s) `arch` failed against a constraint: Invalid view definition

I want to create a new view with a DB-view. When I try to install my app, DB-view was created then I get error: 2015-06-22 12:59:10,574 11988 ERROR odoo Das Feld `datum` existiert nicht Fehler Kontext: Ansicht `overview.tree.view` [view_id: 1532, xml_id: k. A., model: net.time.overview, parent_id: k. A.] 2015-06-22...

group indices of list in list of lists

I am looking for an elegant solution for the following problem. I have a list of ints and I want to create a list of lists where the indices with the same value are grouped together in the order of the occurrences of said list. [2, 0, 1, 1, 3,...

How does the class_weight parameter in scikit-learn work?

I am having a lot of trouble understanding how the class_weight parameter in scikit-learn's Logistic Regression operates. The Situation I want to use logistic regression to do binary classification on a very unbalanced data set. The classes are labelled 0 (negative) and 1 (positive) and the observed data is in...

Python - Opening and changing large text files

I have a ~600MB Roblox type .mesh file, which reads like a text file in any text editor. I have the following code below: mesh = open("file.mesh", "r").read() mesh = mesh.replace("[", "{").replace("]", "}").replace("}{", "},{") mesh = "{"+mesh+"}" f = open("p2t.txt", "w") f.write(mesh) It returns: Traceback (most recent call last): File...

Pandas Dataframe Complex Calculation

I have the following dataframe,df: Year totalPubs ActualCitations 0 1994 71 191.002034 1 1995 77 2763.911781 2 1996 69 2022.374474 3 1997 78 3393.094951 I want to write code that would do the following: Citations of currentyear / Sum of totalPubs of the two previous years I want something to...

Python recursive function not recursing

I'm trying to solve a puzzle, which is to reverse engineer this code, to get a list of possible passwords, and from those there should be one that 'stands out', and should work function checkPass(password) { var total = 0; var charlist = "abcdefghijklmnopqrstuvwxyz"; for (var i = 0; i...

Peewee: reducing where conditionals break after a certain length

This is what I have:, (SomeTable.stuff == entry for entry in big_list))) The problem arises when I have a relatively large list of elements in big_list and I get this: RuntimeError: maximum recursion depth exceeded Is there another way to approach this that doesn't involve splitting up the list...

How to put an image on another image in python, using ImageTk?

I want to put an image in front of another one, then use this combined image as a button's background image in Tkinter. How can I do it? I am free to import Tkimage, Image. Clarify: I want to stick this on the center of this so that something like...

Calling function and passing arguments multiple times

I want to call the function multiple time and use it's returned argument everytime when it's called. For example: def myfunction(first, second, third): return (first+1,second+1,third+1) 1st call: myfunction(1,2,3) 2nd call is going to be pass returned variables: myfunction(2,3,4) and loop it until defined times. How can I do such loop?...

Can't get value from xpath python

I want to get values from page:,Actimel-cytryna-miod-Danone.html I can get all values from first section, but I can't get values from table "Wartości odżywcze" I use this xpath: ''.join(tree2.xpath("//html/body/div[1]/div[3]/article/div[2]/div/div[4]/div[3]/div/div[1]/div[3]/table[1]/tr[3]/td[2]/span/text()")) But I'm not getting anything. With xpath like this: ''.join(tree2.xpath("//html/body/div[1]/div[3]/article/div[2]/div/div[4]/div[3]/div/div[1]/div[3]/table[1]/tr[3]/td[2]//text()")) I'm...

Django: html without CSS and the right text

First of all, this website that I'm trying to build is my first, so take it easy. Thanks. Anyway, I have my home page, home.html, that extends from base.html, and joke.html, that also extends base.html. The home page works just fine, but not the joke page. Here are some parts...

ctypes error AttributeError symbol not found, OS X 10.7.5

I have a simple test function on C++: #include <stdio.h> #include <string.h> #include <stdlib.h> #include <locale.h> #include <wchar.h> char fun() { printf( "%i", 12 ); return 'y'; } compiling: gcc -o -shared -fPIC test.cpp and using it in python with ctypes: from ctypes import cdll from ctypes import c_char_p...

Inconsistency between gaussian_kde and density integral sum

Can one explain why after estimation of kernel density d = gaussian_kde(g[:,1]) And calculation of integral sum of it: x = np.linspace(0, g[:,1].max(), 1500) integral = np.trapz(d(x), x) I got resulting integral sum completely different to 1: print integral Out: 0.55618 ...

represent an index inside a list as x,y in python

I have a list which contains 1000 integers. The 1000 integers represent 20X50 elements of dimensional array which I read from a file into the list. I need to walk through the list with an indicator in order to find close elements to each other. I want that my indicator...

Twilio Client Python not Working in IOS Browser

I have created a simple twilio client application to make phone calls from Web Browser to phones. I used a sample Flask app to generate a secure Capability Token and used twilio.min.js library to handle calls from my HTML. The functionality works fine in Computer Browsers ans Android Phone Browsers,...

Count function counting only last line of my list

Count function counting only last line of my list N = int(raw_input()) cnt = [] for i in range(N): string = raw_input() for j in range(1,len(string)): if string[j] =='K': cnt.append('R') elif string[j] =='R': cnt.append('R') if string[0] == 'k': cnt.append('k') elif string[0] == 'R': cnt.append('R') print cnt.count('R') if I am giving...

Python np.delete issue

A = np.array([[1,2,3],[3,4,5],[5,6,7]]) X = np.array([[0, 1, 0]]) for i in xrange(np.shape(X)[0]): for j in xrange(np.shape(X)[1]): if X[i,j] == 0.0: A = np.delete(A, (j), axis=0) I am trying to delete j from A if in X there is 0 at index j. I get IndexError: index 2 is out of...

how to enable a entry by clicking a button in Tkinter?

I need to activate many entries when button is clicked please do not write class based code, modify this code only because i need to change the whole code for the project as i did my whole project without classes from Tkinter import * import ttk x='disabled' def rakhi(): global...

Python: histogram/ binning data from 2 arrays.

I have two arrays of data: one is a radius values and the other is a corresponding intensity reading at that intensity: e.g. a small section of the data. First column is radius and the second is the intensities. 29.77036614 0.04464427 29.70281027 0.07771409 29.63523525 0.09424901 29.3639355 1.322793 29.29596385 2.321502 29.22783249...

MySQLdb Python - Still getting error when using CREATE TABLE IF NOT EXISTS

I'm using this code to create a database with tables in Python: def CreateDatabase(): global DB_CNX global DB_NAME cursor = DB_CNX.cursor() cursor.execute("""CREATE DATABASE IF NOT EXISTS {} DEFAULT CHARACTER SET 'utf8'""".format(DB_NAME)) cursor.execute("""CREATE TABLE IF NOT EXISTS NAMES(NAME VARCHAR(50) PRIMARY KEY NOT NULL)""") DB_CNX.close() But even if I use the syntax...

Find the tf-idf score of specific words in documents using sklearn

I have code that runs basic TF-IDF vectorizer on a collection of documents, returning a sparse matrix of D X F where D is the number of documents and F is the number of terms. No problem. But how do I find the TF-IDF score of a specific term in...

sys.argv in a windows environment

I'm attempting to learn python using the book 'a byte of python'. The code: import sys print('the command line arguments are:') for i in sys.argv: print(i) print('\n\nThe PYTHONPATH is', sys.path, '\n') outputs: the command line arguments are: C:/Users/user/PycharmProjects/helloWorld/ The PYTHONPATH is ['C:\\Users\\user\\PycharmProjects\\helloWorld', 'C:\\Users\\user\\PycharmProjects\\helloWorld', 'C:\\Python34\\', 'C:\\Python34\\DLLs', 'C:\\Python34\\lib', 'C:\\Python34', 'C:\\Python34\\lib\\site-packages']...

trying to understand LSH through the sample python code

the concise python code i study for is here Question A @ line 8 i do not really understand the syntax meaning for "res = res << 1" for the purpose of "get_signature" Question B @ line 49 (SOLVED BY myself through another Q&A) "xor = r1^r2" does not really...

How do variables inside python modules work?

I am coming from a Java background with Static variables, and I am trying to create a list of commonly used strings in my python application. I understand there are no static variables in python so I have written a module as follows: import os APP_NAME = 'Window Logger' APP_DATA_FOLDER_PATH...

Sum of two variables in RobotFramework

I have two variables: ${calculatedTotalPrice} = 42,42 ${productPrice1} = 43,15 I executed ${calculatedTotalPrice} Evaluate ${calculatedTotalPrice}+${productPrice1} I got 42,85,15 How can I resolve it?...

SQLAlchemy. 2 different relationships for 1 column

I have a simple many-to-many relationship with associated table: with following data: matches: users: users_mathces: ONE user can play MANY matches and ONE match can involve up to TWO users I want to realize proper relationships in both "Match" and "User" classes users_matches_table = Table('users_matches', Base.metadata, Column('match_id', Integer, ForeignKey('', onupdate="CASCADE",...

Matplotlib: Plot the result of an SQL query

from sqlalchemy import create_engine import _mssql from matplotlib import pyplot as plt engine = create_engine('mssql+pymssql://**:****@') connection = engine.connect() result = connection.execute('SELECT Campaign_id, SUM(Count) AS Total_Count FROM Impressions GROUP BY Campaign_id') for row in result: print row connection.close() The above code generates an array: (54ca686d0189607081dbda85', 4174469) (551c21150189601fb08b6b64', 182) (552391ee0189601fb08b6b73', 237304) (5469f3ec0189606b1b25bcc0',...

Create an exe with Python 3.4 using cx_Freeze

I have found two other articles about this problem on Stack Exchange but none of them has a clear answer: is it possible to create a .exe of a Python 3.4 script? The only solution I found was to use cx_Freeze. I used it, and it indeed created an executable...

REGEX python find previous string

I'm trying to find if the last word of the string is followed by a space or a special char, and if yes return the string without this space/special char For example : "do you love dogs ?" ==> return "do you love dogs" "i love my dog " (space...

How do I read this list and parse it?

I'm using requests and the output I get from the sites API is a list, I've been stuck trying to parse it to get the data from it. I use r = requests.get(urlas, params=params) r.json() to get the data I want. Here is a snippet of the list [{'relation_type': None,...

Pandas - Dropping multiple empty columns

I have some tables where the first 11 columns are populated with data, but all columns after this are blank. I tried: df=df.dropna(axis=1,how='all') which didn't work. I then used: df = df.drop(df.columns[range(11,36)], axis=1) Which worked on the first few tables, but then some of the tables were longer or shorter...

Displaying a 32-bit image with NaN values (ImageJ)

I wrote a multilanguage 3-D image denoising ImageJ plugin that does some operations on an image and returns the denoised image as a 1-D array. The 1-D array contains NaN values (around the edges). The 1-D array is converted back into an image stack and displayed. It is simply black....

Sort when values are None or empty strings python

I have a list with dictionaries in which I sort them on different values. I'm doing it with these lines of code: def orderBy(self, col, dir, objlist): if dir == 'asc': sorted_objects = sorted(objlist, key=lambda k: k[col]) else: sorted_objects = sorted(objlist, key=lambda k: k[col], reverse=True) return sorted_objects Now the problem...

Python: can't access newly defined environment variables

I can't access my env var: import subprocess, os print os.environ.get('PATH') # Works well print os.environ.get('BONSAI') # doesn't work But the env var is well added in my /home/me/.bashrc: BONSAI=/home/me/Utils/bonsai_v3.2 export BONSAI And I can access this env var from a new terminal....

How to check for multiple attributes in a list

I am making a TBRPG game using Python 2.7, and i'm currently making a quest system. I wanted to make a function that checks all of the quests in a list, in this case (quests), and tells you if any of of the quests in the list have the same...

In sklearn, does a fitted pipeline reapply every transform?

Apologies if this is obvious but I couldn't find a clear answer to this: Say I've used a pretty typical pipeline: feat_sel = RandomizedLogisticRegression() clf = RandomForestClassifier() pl = Pipeline([ ('preprocessing', preprocessing.StandardScaler()), ('feature_selection', feat_sel), ('classification', clf)]),y) Now when I apply pl on a new set, pl.predict(X_classify); is RandomizedLogisticRegression going...

Identify that a string could be a datetime object

If I knew the format in which a string represents date-time information, then I can easily use datetime.datetime.strptime(s, fmt). However, without knowing the format of the string beforehand, would it be possible to determine whether a given string contains something that could be parsed as a datetime object with the...

Parse text from a .txt file using csv module

I have an email that comes in everyday and the format of the email is always the same except some of the data is different. I wrote a VBA Macro that exports the email to a text file. Now that it is a text file I want to parse the...

How to use template within Django template?

I have the django template like below: <a href="{{ }}" target="_blank"><h1 class="title">{{ mylist.0.title }}</h1></a> <p> {{ mylist.0.text|truncatewords:50 }}<br> ... (the actual template is quite big) It should be used 10 times on the same page, but 'external' html elements are different: <div class="row"> <div class="col-md-12 col-lg-12 block block-color-1"> *django...

Why does the update method in Tkinter cause the window to freeze?

First of all, I know Tkinter isn't thread safe and this problem has something to do with that but I wanted to find out formally why this code makes a window that displays but is unresponsive. from Tkinter import * root = Tk() c = Canvas() c.pack() c.create_line(10,10, 30, 30)...

The event loop is already running

I have the following 5 files: # -*- coding: utf-8 -*- from PyQt4 import QtCore, QtGui try: _fromUtf8 = QtCore.QString.fromUtf8 except AttributeError: def _fromUtf8(s): return s try: _encoding = QtGui.QApplication.UnicodeUTF8 def _translate(context, text, disambig): return QtGui.QApplication.translate(context, text, disambig, _encoding) except AttributeError: def _translate(context, text, disambig): return QtGui.QApplication.translate(context, text, disambig)...

Using counter on array for one value while keeping index of other values

After reading the answers on this question How to count the frequency of the elements in a list? I was wondering how to count the frequency of something, and at the same time retreive some extra information, through something like an index. For example a = ['fruit','Item#001'] b = ['fruit','Item#002']...

Replace nodejs for python?

i'm working in a HTML5 multiplayer game, and i need a server to sync player's movement, chat, battles, etc. So I'm looking for ways to use python instead nodejs, because i have I have more familiarity with python. The server is simple: var express = require('express'); var app = express();...

Spring-integration scripting with Python

I'm trying to use Python with spring-integration and jython-standalone-2.7.0: Here is my application context: <int:inbound-channel-adapter id="in" channel="exampleChannel" > <int:poller fixed-rate="1000" /> <int-script:script lang="python" location="script/" /> </int:inbound-channel-adapter> <int:channel id="exampleChannel" /> <int-ip:udp-outbound-channel-adapter id="udpOut" channel="exampleChannel" host="" port="11111" /> Here is my script in Python: print "Python"...

Python Popen - wait vs communicate vs CalledProcessError

Continuing from my previous question I see that to get the error code of a process I spawned via Popen in python I have to call either wait() or communicate() (which can be used to access the Popen stdout and stderr attributes): app7z = '/path/to/7z.exe' command = [app7z, 'a', dstFile.temp,...