Tag Archives: development

How To Mine Twitter Streams from PHP in Real Time

UPDATE: I’ve wrote a new post with an example on how to connect to the v1.1 Twitter API, using OAuth – here.

Need to mine Twitter for tweets related to certain keywords?

No problem-

Twitter provides a pretty simple streaming interface to the onslaught of tweets it receives, letting you specify whatever keywords you want to search for, in a real-time “live” way.

To do this, I created a simple PHP class that can run in the background, collecting tweets for certain keywords:

ctwitter_stream.php

class ctwitter_stream
{
    private $m_username;
    private $m_password;

    public function __construct()
    {
        //
        // set a time limit to unlimited
        //
        set_time_limit(0);
    }

    //
    // set the login details
    //
    public function login($_username, $_password)
    {
        $this->m_username = $_username;
        $this->m_password = $_password;
    }

    //
    // process a tweet object from the stream
    //
    private function process_tweet(array $_data)
    {
        print_r($_data);

        return true;
    }

    //
    // the main stream manager
    //
    public function start(array $_keywords)
    {
        while(1)
        {
            $fp = fsockopen("ssl://stream.twitter.com", 443, $errno, $errstr, 30);
            if (!$fp)
            {
                echo "ERROR: Twitter Stream Error: failed to open socket";
            } else
            {
                //
                // build the request
                //
                $request  = "GET /1/statuses/filter.json?track=";
                $request .= urlencode(implode($_keywords, ',')) . " HTTP/1.1\r\n";
                $request .= "Host: stream.twitter.com\r\n";
                $request .= "Authorization: Basic ";
                $request .= base64_encode($this->m_username . ':' . $this->m_password);
                $request .= "\r\n\r\n";

                //
                // write the request
                //
                fwrite($fp, $request);

                //
                // set it to non-blocking
                //
                stream_set_blocking($fp, 0);

                while(!feof($fp))
                {
                    $read   = array($fp);
                    $write  = null;
                    $except = null;

                    //
                    // select, waiting up to 10 minutes for a tweet; if we don't get one, then
                    // then reconnect, because it's possible something went wrong.
                    //
                    $res = stream_select($read, $write, $except, 600, 0);
                    if ( ($res == false) || ($res == 0) )
                    {
                        break;
                    }

                    //
                    // read the JSON object from the socket
                    //
                    $json = fgets($fp);
                    if ( ($json !== false) && (strlen($json) > 0) )
                    {
                        //
                        // decode the socket to a PHP array
                        //
                        $data = json_decode($json, true);
                        if ($data)
                        {
                            //
                            // process it
                            //
                            $this->process_tweet($data);
                        }
                    }
                }
            }

            fclose($fp);
            sleep(10);
        }

        return;
    }
};

The “process_tweet()” method will be called for each matching tweet- just modify that method to process the tweet however you want (load it into a database, print it to screen, email it, etc). The keyword matching isn’t perfect- if you search for a string of words, it won’t necessarily match the words in that exact order, but you can check that yourself from the process_tweet() method.

Then create a simple PHP application to run the collector:

require 'ctwitter_stream.php';

$t = new ctwitter_stream();

$t->login('your twitter username', 'your twitter password');

$t->start(array('facebook', 'fbook', 'fb'));

Just provide your twitter account username/password, and then an array of keywords/strings to search for.

Since this application runs continuously in the background, it’s obviously not meant to be run via a web request, but meant to be run from the command line of your Unix or Windows box.

According to the Twitter documentation, the default access level allows up to 400 keywords, so you can track all sorts of things at the same time. If you need more details about the Twitter streaming API, it’s available here.

This class uses the HTTPS PHP stream– so you’ll need the OpenSSL extension enabled for it to work.

Automatic Dial Resource Fail-over in Asterisk

Asterisk is generally pretty reliable, but termination providers aren’t always so good; in a market where anybody can re-sell an upstream provider, or setup a few Asterisk boxes and start routing calls for people, it’s generally a good idea to have a “backup” provider (or three) to route your calls through.

You can easily setup an Asterisk system, to fail-over to secondary systems, if your primary provider fails for some reason- and this can all be done right in the dial plan, using a simple MACRO.

Add this MACRO to your dial plan:

[macro-direct-dial]
exten => s,1,Set(CALL_ATTEMPT=1)
exten => s,2,Set(TERM_PROVIDER=${TERM_PROVIDER1})
exten => s,3,Dial(${TERM_PROVIDER}/${ARG1},60)
exten => s,4,GotoIf($["${CALL_ATTEMPT}" >= "${MAX_PROVIDERS}"]?s-CANCEL,1)
exten => s,5,Set(CALL_ATTEMPT=$[${CALL_ATTEMPT} + 1])
exten => s,6,Goto(s-${DIALSTATUS},1)

exten => s-BUSY,1,Noop()
exten => s-NOANSWER,1,Noop()
exten => s-CANCEL,1,Hangup()
exten => s-HANGUP,1,Hangup()

exten => s-CHANUNAVAIL,1,Set(TERM_PROVIDER=${EVAL(${TERM_PROVIDER${CALL_ATTEMPT}})})
exten => s-CHANUNAVAIL,2,Goto(s,3)

exten => s-CONGESTION,1,Set(TERM_PROVIDER=${EVAL(${TERM_PROVIDER${CALL_ATTEMPT}})})
exten => s-CONGESTION,2,Goto(s,3)

Now you’ll need to route your calls into this MACRO; this can vary by dial plan, as you may have a special configuration for different area codes, or country codes, or based on some least-cost-routing business decisions, but a simple example would be something like this:

[default]
exten => _1NXXNXXXXXX,1,Answer()
exten => _1NXXNXXXXXX,2,Macro(direct-dial,${EXTEN})
exten => _1NXXNXXXXXX,3,Hangup()

This routes any NANPA numbers through the direct-dial MACRO above, passing in the phone number as the first argument to the MACRO.

Now, before this will work, you’ll need to configure some variables; this can be done in many places- in my working configuration, I have these variables dynamically generated via an AGI script, based on the phone number being dialed. This way I can control dial-groups, by phone number, based on a cost/preference/etc.

In this example, we’ll simply set these values in the globals section of the extensions.conf file:

[globals]
TERM_PROVIDER1 = SIP/first_provider
TERM_PROVIDER2 = IAX/second_provider
TERM_PROVIDER3 = SIP/last_provider
MAX_PROVIDERS = 3

So I’ve configured three fictitious termination providers; you can specify as many as you like, as long as the TERM_PROVIDER increments one for each, and you set the MAX_PROVIDERS value to the total number of providers listed.

This is obviously more useful if this list is automatically generated somehow, or changed based on the phone number being dialed, otherwise the retries could simply be hard-coded into the dial plan.

Now when you dial your number, it will start with the first (default) provider; if the dial() function returns a congestion or channel un-available error, the MACRO will cycle to the next provider, until it as gone through all of the providers listed.

Using DKIM in Exim

Since Exim 4.70, DKIM (DomainKeys Indentified Mail – RFC4871) has been supported by default.

The current implementation supports signing outgoing mail, as well as verifying signatures in incoming messages, using the acl_smtp_dkim ACL. By default, DKIM signatures are verified as new messages come in, though no action is taken unless you’ve implicitly configured rules in the DKIM ACL.

After installing Exim (>= 4.70), you should see debug logs for incoming mail from servers that have DKIM signatures setup- they look like:

DKIM: d=gmail.com s=gamma c=relaxed/relaxed a=rsa-sha256 [verification succeeded]
Verifying Incoming Mail

By default, Exim does not filter any mail based on the validity of the DKIM signature- it’s up to you to add ACL rules to control what happens when you receive messages with “bad” signatures.

First add an ACL section for the DKIM processing; this should be included with your other ACL statements:

acl_smtp_dkim = acl_check_dkim

Next, after the “begin acl”, section, add your DKIM ACL section, and by default, accept all messages in this ACL:

acl_check_dkim:

	accept

Now you need to decide what kind of rules you want to setup- you probably don’t want to put a rule that applies to all domains- though, if the company went to the trouble of adding DKIM signatures to their e-mail, you’d hope they’d get it right, and not publish invalid public keys.

For now, let’s add a simple rule for gmail; google knows what they’re doing, so their systems should be setup correctly:

acl_check_dkim:

	#
	# check the DKIM signature for gmail
	#
	deny 	message 	= Common guys, what's going on?
		sender_domains 	= gmail.com
		dkim_signers 	= gmail.com
		dkim_status 	= none:invalid:fail

	accept

You can add as many rules, for whatever domains you want in this ACL.

Signing Outgoing Mail

Now that you’re checking incoming mail, you probably want to sign mail coming out of your system. This is a relatively easy process, that I’ve broken down into three steps:

Step1– Generate a private and public key to sign your messages; you can do this easily with openssl:

#openssl genrsa -out dkim.private.key 1024

Then extract the public key from the private key:

#openssl rsa -in dkim.private.key -out dkim.public.key -pubout -outform PEM

Step2– Configure the Exim remote-smtp transport to sign outgoing messages, using your new private key. You’ll need to pick a domain and a selector for this process.

When remote SMTP servers validate your DKIM signatures, they simply do a DNS look up, based on the selector and your domain- the domain needs to (obviously) be a valid domain you own, that you can add DNS entries to, and the selector can be any string you want. So, for example, using the domain “example.com”, and the selector “x”, you would add to the remote_smtp transport in Exim:

remote_smtp:
        driver = smtp
        dkim_domain = example.com
        dkim_selector = x
        dkim_private_key = dkim.private.key
        dkim_canon = relaxed

This tells Exim to sign any outbound e-mail, using the domain example.com, the selector “x”, and the private key we just generated. The dkim_canon = relaxed, sets the canonicalization method to use when signing messages. DKIM supports “simple” and “relaxed” algorithms- to understand the difference, see section 3.4 of the DKIM RFC.

Step3– add your DKIM public key to your DNS.

The DKIM public key generated above is advertised to other SMTP servers, using a DNS TXT record. In DNS for the domain example.com, add a new TXT record:

x._domainkey.example.com.   TXT v=DKIM1; t=y; k=rsa; p=<public key>

Where “x” is the selector you used above, and <public key> is the public key data (minus the key header/footer text).

When setup correctly, your DKIM text record should look something like this:

# host -t txt x._domainkey.example.com

x._domainkey.example.com descriptive text "v=DKIM1\; t=y\; k=rsa\; p=MIGfMA0GCS
qGSIb3DQEBAQUAA4GNADCBiQKBgQC5k8yUyuyu9UAVHHU7Al4ppTDtxFWsZ6Pqd9NWZnomtewBdz8I
2LJkqmA/3Cyb5Eiaqk4NulPFfDbfA0Lkw7SNyOS9BRN02KGtKIWjFqDwjB99haaWYw9H4IZcuJp0Y
q0kySCdBp/sPP+iTotdBiE85Jakw3tzgYkdvaS05ZUdBwIDAQAB"

(lines breaks were added for readability- your entry should be one continuous line)

This DNS record is referred to as the “selector” record; you need to also setup a “policy” record. The policy record is your domains policy for domain keys- you should start with something like:

_domainkey.example.com. t=y; o=~;

The t=y specifies that you are in test mode and this should be removed when you are certain that your domain key setup is functioning properly. The “~” in the o=~ specifies that some of the mail from your domain is signed, but not all. You could also specify o=- if all of the mail coming from your domain will be signed.

Once you have all of that in-place,  restart Exim, and send out a message using the remote-smtp transport. You should now see a DKIM-Signature: header listed in the message headers, which lists your domain (as d=), and selector (as s=), as well as a signature for this e-mail, which can be validated against your public DKIM key, that you’ve published in DNS.

For more information, see the Exim DKIM page, or the DKIM RFC.

UPDATE:

Once you’ve set everything up, you can test your DKIM (and SPF and SenderID, etc) install, by using the port25.com validation service.

Just send an e-mail to check-auth@verifier.port25.com, and it will auto-respond with a validation report

UPDATE 2:

I’ve updated this to use a key length of at least 1024 bits, otherwise it’s possible to crack the DKIM key, and fake it to show that your email is valid. This came to light largely because of an article about how Google was using a 512 bit key and a “hacker” factored the key, and spoofed emails to the Google founders:

http://www.wired.com/threatlevel/2012/10/dkim-vulnerability-widespread/

I previously listed a key length of 768, which is significantly harder to break than a 512 bit key, but just to be safe, use 1024 or better yet, 2048.

Handling SIP URI Dialing in Asterisk

Asterisk, by design, is very “extension” orientated- that is, if you want to dial an end-point, it requires an extension to route the call to. These extensions (defined in the asterisk extensions.conf file), can be extensions registered to phones, DIDs (XXXYYYZZZZ), or simply usernames assigned to users by the network administrator. Extensions are used for both incoming, and outgoing phone calls.asterisk

For example, if I place a call through my SIP phone to 1-444-555-1212, then asterisk will look up the “extension” 14445551212 in the extensions.conf file, to determine how to route the call.

Similarly to how e-mail works, an artifact of these extensions is a direct SIP address, which is basically your SIP extension @ your SIP server- so, if my phone was extension 555, and my SIP server had the IP address 192.168.1.1, then my SIP address would be: sip:555@192.168.1.1, and (if it’s not implictely blocked), I can be dialed directly using that SIP address.

But, because Asterisk is so extension orientated, it doesn’t easily allow for outbound dialing, using remote SIP addresses; If I try to dial the address sip:444@somedomain.com, Asterisk will immediately strip off the host portion (@somedomain.com), and try to route the call based simply on the extension “444”- which, since it’s an extension on a remote server (@somedomain.com), it won’t be able to route it locally, and will fail.

The solution? Well, it’s not ideal, but Asterisk provides the ability to use wild cards in the extensions.conf file (it refers to them as “patterns“) when doing extension look ups; this is handy when you have blocks of extension or DID’s- you can use the wildcard to map like extensions to the same config, keeping your config file small.

The issue, is that the wild card only compares against the local part of the SIP URI, which can look like almost anything, including other phone numbers.

First, define some general config

[general]
;
; Your termination provider (defined in sip.conf)
;
TERM_PROVIDER = SIP/company_peer

;
; The IP address of this asterisk server
;
ASTERISK_IP = 192.168.1.1

The “ASTERISK_IP” address is important later, as we’ll use it to validate outgoing SIP addresses.

Then in your default config, you should have something like this configured already- handling NANPA style dialing, and international dialing

[default]
;
; Dial NANPA style phone numbers directly
;
exten => _1NXXNXXXXXX,n,Dial(${TERM_PROVIDER}/${EXTEN},60)
exten => _1NXXNXXXXXX,n,HangUp()

;
; Dial international numbers directly
;
exten => _011.,n,Dial(${TERM_PROVIDER}/${EXTEN}, 60)
exten => _011.,n,HangUp()

After all the specific matches are done, then add:

;
; very last, assume anything else is a SIP URI
;
exten => _.,n,GotoIf($[${SIPDOMAIN} = ${ASTERISK_IP}]?unhandled)
exten => _.,n,GotoIf($[${SIPDOMAIN} = ${ASTERISK_IP}:5060]?unhandled)
exten => _.,n,Macro(uri-dial,${EXTEN}@${SIPDOMAIN})
exten => _.,n,HangUp()

;
; if the call doesn't match anything
;
[unhandled]
exten => s,n,Congestion()

The reason this has to be last, is because matching the extension “_.” will match *anything*- basically it’s a catch-all. You’re saying that if it doesn’t match anything else before this, then assume it must be a SIP URI.

This section also compares the ${SIPDOMAIN} variable to your ASTERISK_IP address; this ensures that only SIP URI’s with remote hosts are processed as SIP URI’s. If the host matches our ASTERISK_IP address value (ie- it’s a local extension), then it should have already matched something above this catch-all config.

Pass any SIP URI dials to the uri-dial MACRO, merging back together the extension and the SIP domain value.

;
; handle dialing SIP uri's directly
;
[macro-uri-dial]
exten => s,n,NoOp(Calling as SIP address: ${ARG1})
exten => s,n,Dial(SIP/${ARG1},60)

This solution works, but isn’t ideal, as it will match anything that didn’t already match; a better solution would be for it to NOT strip off the SIP domain, and allow for using a regular expression of some kind to check the extension, but there is currently no better way of handling this in Asterisk.

Memcache Access From PostgreSQL

A big part of my job is to spend time figuring out how new technologies work, build on it, and then figure out new and inventive ways of merging it into new and existing systems- so I spent a lot of time “playing” around with stuff.

One thing I recently threw together, was some simple stored procedures for PostgreSQL, to give us built-in access to our memcached cluster, using the PL/PERL procedural language, and the Cache::Memcached module. None of this is rocket science, but might be useful for somebody out there that is looking to implement something similar.

First off, you need to build PostgreSQL with PERL support, and create the plperlu (un-trusted) language in our database; you need to use the “un-trusted” language, as we need to “use” the Cache::Memcached module;

postgres=# create language plperlu ;
CREATE LANGUAGE

Then you need to install the Cache::Memcached PERL module; I’ll assume you know how to do that; you can either install it through CPAN, or get it from the cpan.org site.

You also need at least one instance of memcached running somewhere; it doesn’t have to be on your local network, it just needs to be accessible by your database.

Then create the following procedures; I’ve only listed the “set” and “get” procedures, but you could easily follow the Cache::Memcached module man page, and create a “replace” and “delete” (and any other) procedures you needed.

So first the “set” procedure:

create or replace function memcache_set(_key text, _value bytea) returns int as $$

        use Cache::Memcached;

        my ($_key, $_value) = @_;

        $m = new Cache::Memcached {
                'debug'	=> 0
        };

        my @list = ("127.0.0.1:9996", "127.0.0.1:9997");
        my $servers = \@list;

        $m->set_servers($servers);
        $m->enable_compress(0);

        if ($m->set($_key, $_value))
        {
                return 0;
        } else
        {
                return 1;
        }

$$ language plperlu;

And then a “get” procedure:

create or replace function memcache_get(_key text) returns bytea as $$

        use Cache::Memcached;       

        my ($_key) = @_;

        $m = new Cache::Memcached {
                'debug'	=> 0
        };

	my @list = ("127.0.0.1:9996", "127.0.0.1:9997");
        my $servers = \@list;       

        $m->set_servers($servers);
        $m->enable_compress(0);

        my $val = $m->get($_key);
        if (defined($val))
        {
                return $val;
        } else
        {
                return undef;
        }

$$ language plperlu;

Now, in this case, I used a list of two different servers, both running on localhost, one on port 9996 and the other on port 9997; these servers could be anywhere, and you could only list one- but a really important thing to remember: if you do list more than one server, make sure you always list it in the same order, between all your procedures and all applications that may use this same data.

memcached calculates which server to store the data on, based on the order of this list; so you’ll get un-expected results if you list these servers differently between your applications.

A Simple Test

Now that you have the functions defined, you can use them to do some really interesting caching, directly from your database. Starting off with a simple test:

postgres=# select memcache_set('foo', 'bar');
 memcache_set 
---------------
               0
(1 row)

Then do a “get” to retrieve the data

postgres=# select memcache_get('foo');
 memcache_get
---------------
 bar
(1 row)

Database Session Handler

Now, lets say you had a database-backed session handler for your website; say maybe a PHP site, using something like this, which simply uses the PHP session_set_save_handler() method. Assume a simple session table like this:

create table sessions (
	session_id char(32) not null,
	data bytea not null default ''
);

“session_id” is the PHP session id (an MD5 string in this case), and “data” is the serialized contents of the PHP session.

Then add two rules- one for insert and one for update; this triggers the new memcache_set() procedure, so that new session data is always pushed out to your memcached servers

create rule session_insert_rule as on insert to sessions
	do also select memcache_set(NEW.session_id::text, NEW.data);

create rule session_update_rule as on update to sessions
	do also select memcache_set(NEW.session_id::text, NEW.data);

Testing this is simple:

postgres=# insert into sessions values ('5cc27b0285c9d2e6dc4125456d308548', 
     'This would be the session data');
INSERT 0 1

postgres=# select memcache_get('5cc27b0285c9d2e6dc4125456d308548');
              memcache_get
--------------------------------
 This would be the session data
(1 row)

And then subsequent updates

postgres=# update sessions set data = 'This is the new data' where
   session_id = '5cc27b0285c9d2e6dc4125456d308548';
UPDATE 1

postgres=# select memcache_get('5cc27b0285c9d2e6dc4125456d308548');
   memcache_get
-------------------
 This is the new data
(1 row)

Now just make your PHP session handler (or whatever) checks memcached first, before selecting from the sessions table; if you build it right, and set your expiration times correctly, you should be able to build a system that (almost) *never* reads this data from the database, as the data is always available from memcached.